Endpoint hosted in AKS using Azure Service Bus transport becomes unresponsive

I’m currently working on building a process to loop over millions of records within a database and execute some set of business logic against each record in the database. I’m anticipating this process to take several hours so I’m trying to make the process restartable. My current design is to use a saga, push out a command that represents a smaller batch (5,000 ish) of records to process and reply to the saga letting it know I’m done. We are currently using a Linux docker image hosted within Azure Kubernetes Services (AKS) to handle executing this logic since the database storing the records is within Azure SQL. We are also using Azure Service Bus as the transport.

This design works great for a little while but after about an hour the code executing within AKS becomes unresponsive. There is nothing being logged to Application Insights or AKS logs. I can see that the process was in the middle of the handler when the logs stop being created. I hooked up a heartbeat to push messages to a random queue that wasn’t being monitored and we are still receiving heart beat messages from the endpoint.

We aren’t sure if the breakdown has to do with AKS, Azure Service Bus transport or even somewhere within the execution of our business logic. We are wondering if anyone else has experience this or has ideas on where we could look next to figure out what is going on.

Here are some things we have tried/noticed

  • We noticed that periodically the endpoint would seem to stop running in the middle of handling a message, no exception would be logged and then some increment of 5 minutes later it would start processing the same message again. This makes us believe the root issue we are having could have something to do with lock duration and/or renewals within Azure Service Bus
  • We increased the LogLevel to Trace to see if we could see any additional logs within Application Insights and/or AKS
  • We reduced the batch size down to 5 records to guarantee that the handler would not need to lock the message for more than 30 seconds. Based on some other posts we noticed there could be some issue with having our handler run for too long
  • We set the prefetch count down to 1 to minimize how many messages are being pulled back at one time from Azure Service Bus
  • We implemented the Critical Error Action to make sure our application exited well on any critical errors
  • We reduced the number of pods down to 1 running within AKS.
  • We tried running the application locally on our machines from Visual Studio but the application ran relatively fine minus some locking issues with Azure Service Bus that recovered automatically

Here is some code

  • Setting up IOC Container
public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
            .UseConsoleLifetime()
            .ConfigureLogging(loggingBuilder =>
 
            .....

            .AddNserviceBusService(Configuration["NServiceBus:EndpointName"],
                endpointConfiguration =>
                {
                    endpointConfiguration.DefineCriticalErrorAction(OnCriticalError);
                    endpointConfiguration.UseAzureServiceBus(Configuration["NServiceBus:AzureServiceBusConnectionString"], 
                                                             out var azureServiceBusTransport, 
                                                             Configuration["NServiceBus:AzureServiceBusTopicName"], 
                                                             heartbeatEndpoint: Configuration["NServiceBus:ErrorQueueName"],
                                                             prefetchCount: 1);

                    endpointConfiguration.UseAzureSqlPersistence(Configuration["EventDatabase:ServerName"], Configuration["EventDatabase:DatabaseName"], "Processing");
                    }, errorQueueName: Configuration["NServiceBus:ErrorQueueName"], auditQueueName: Configuration["NServiceBus:AuditQueueName"]
                );

  • Code behind AddNserviceBusService extension method
public static IHostBuilder AddNserviceBusService(this IHostBuilder hostBuilder,
                                                 string endpointName,
                                                 Action<EndpointConfiguration> endpointDelegate,
                                                 bool enableErrorQueue = true,
                                                 bool enableAuditQueue = true,
                                                 string errorQueueName = "error",
                                                 string auditQueueName = "audit",
                                                 bool enableLogging = true)
{
    hostBuilder
        .UseNServiceBus(hostBuilderContext =>
        {
            var endpointConfiguration = new EndpointConfiguration(endpointName);

            if (enableErrorQueue)
            {
                endpointConfiguration.SendFailedMessagesTo(errorQueueName);
            }
            if (enableAuditQueue)
            {
                endpointConfiguration.AuditProcessedMessagesTo(auditQueueName);
            }
            if (enableLogging)
            {
                endpointConfiguration.EnableLogging();
            }

            var settings = new JsonSerializerSettings
            {
                Converters = { new StringEnumConverter() }
            };
            var serialization = endpointConfiguration.UseSerialization(new NewtonsoftSerializer());
            serialization.Settings(settings);

            //do all the config that the caller wants to here
            endpointDelegate(endpointConfiguration);

            return endpointConfiguration;
        });

    return hostBuilder;
}
  • Code behind UseAzureServiceBus extension method
public static EndpointConfiguration UseAzureServiceBus(this EndpointConfiguration config, 
                                                       string connectionString, 
                                                       out TransportExtensions<AzureServiceBusTransport> transportDefinition,
                                                       string topicName = "nservicebus-topic",
                                                       string heartbeatEndpoint = null,
                                                       string metricsEndpoint = null,
                                                       int heartbeatFrequency = 20,
                                                       int metricsInterval = 50,
                                                       int? prefetchCount = null)
{
    config.ConfigureHeartbeatEndpoint(heartbeatEndpoint, heartbeatFrequency)
          .ConfigureMetricsEndpoint(metricsEndpoint, metricsInterval);

    var transport = config.UseTransport<AzureServiceBusTransport>();
    transport.CustomTokenProvider(new ManagedIdentityTokenProvider());
    transport.ConnectionString(connectionString);
    transport.TopicName(topicName);
    if (prefetchCount.HasValue)
    {
        transport.PrefetchCount(prefetchCount.Value);
    }

    transportDefinition = transport;

    return config;
}

If the lock duration expires another message will pick it up. This limits the processing time for any given message to a total of 5 minutes.

It sounds like you may have a blocking call in your handler. Can you add some fine-grained logging/tracing to discover what it is?

Thank you for looking in to my post and your suggestion. After I posted this we started breaking our handlers down to not contain any business logic and re-introduced small portions at a time. We found that our SQL connection management wasn’t up to par and once we resolved this we were able to successfully complete the processing with our design.