'Azure.Messaging.ServiceBus.ServiceBusExceptio' when trying to send messages

Hi

We are getting stuck in what seems like an endless message loop, when trying to publish event using Azure Service Bus. Unfortunately, we are not able to recover from this other than restarting the host process.

Any guidelines on how to mitigate this behavior?

Packages:

  • NServiceBus 8.0.0
  • NServiceBus.Transport.AzureServiceBus 3.0.0

TransportTransactionMode: 'ReceiveOnly

Exception:

Exception from Send.
Azure.Messaging.ServiceBus.ServiceBusException: The operation did not complete within the allocated time 00:00:30 for object [REMOVED].windows.net:5671. (ServiceTimeout). For troubleshooting information, see azure-sdk-for-net/sdk/servicebus/Azure.Messaging.ServiceBus/TROUBLESHOOTING.md at main · Azure/azure-sdk-for-net · GitHub.
—> System.TimeoutException: The operation did not complete within the allocated time 00:00:30 for object [REMOVED].windows.net:5671.
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.ConnectAsyncResult.End(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.<>c.b__17_1(IAsyncResult r)
at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization)
— End of stack trace from previous location —
at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.CreateAndOpenConnectionAsync(Version amqpVersion, Uri serviceEndpoint, Uri connectionEndpoint, ServiceBusTransportType transportType, IWebProxy proxy, String scopeIdentifier, TimeSpan timeout, ServiceBusTransportMetrics metrics)
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenSenderLinkAsync(String entityPath, String identifier, TimeSpan timeout, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpSender.CreateLinkAndEnsureSenderStateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) --- End of inner exception stack trace --- at Azure.Messaging.ServiceBus.Amqp.AmqpSender.SendBatchInternalAsync(AmqpMessage batchMessage, TimeSpan timeout, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpSender.SendBatchInternalAsync(IReadOnlyCollection1 messages, TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpSender.<>c.<b__21_0>d.MoveNext()
— End of stack trace from previous location —
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__221.<<RunOperation>b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logRetriesAsVerbose)
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpSender.SendBatchAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.ServiceBusSender.SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken)
at NServiceBus.Transport.AzureServiceBus.MessageDispatcher.DispatchBatchForDestination(String destination, ServiceBusClient client, Transaction transaction, Queue`1 messagesToSend, CancellationToken cancellationToken) in //src/Transport/Sending/MessageDispatcher.cs:line 198
at NServiceBus.Transport.AzureServiceBus.MessageDispatcher.Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, CancellationToken cancellationToken) in /
/src/Transport/Sending/MessageDispatcher.cs:line 99

Upon further inspection of the logs, we are seeing variations of the communications exceptions:

Exception from Send.
Azure.Messaging.ServiceBus.ServiceBusException: Connection timed out ErrorCode: TimedOut (ServiceCommunicationProblem). For troubleshooting information, see azure-sdk-for-net/sdk/servicebus/Azure.Messaging.ServiceBus/TROUBLESHOOTING.md at main · Azure/azure-sdk-for-net · GitHub.

Exception from Send.
Azure.Messaging.ServiceBus.ServiceBusException: Creation of RequestResponseAmqpLink did not complete in 0 milliseconds. (ServiceTimeout). For troubleshooting information, see azure-sdk-for-net/sdk/servicebus/Azure.Messaging.ServiceBus/TROUBLESHOOTING.md at main · Azure/azure-sdk-for-net · GitHub.

Good Day Christian

Thanks for reaching out. Sorry that you experience these problems. Is this only for a specific handler or more a global problem that you are experiencing?

Are you by any chance sending many messages from the handlers? Are those messages rather large? What tier are you using? We have introduced first class batching support to multiple destinations.

Although we have tested this with various message sizes, various batch sizes across all supported transport transaction modes so I wonder what that could be in your specific case.

Regards,
Daniel

Hi Daniel

Glad to hear from you :slight_smile:

The messages are sent from a Microsoft.Extensions.Hosting.BackgroundService hosted in AspNetCore (.Net 7) and running as a console job.

The service queries a legacy database and uses the IMessageSession to emit up to 5.000 events per loop cycle.

Messages are not very large, and we are using the Premium tier with autoscaling up to 16 units.

Hopes this answers most of your questions :wink:

/Christian

Christan,

Can you share some pseudo code? That would help to reproduce it. Are you by any chance wrapping that send with a transaction scope? It should not automatically enlist things and those send operations should not be batched so something is off.

Daniel

It roughly goes something like this:

public class EventMessagingService: BackgroundService
{
private readonly IMessageSession messageSession;
private readonly IEventsRepository eventRepository;

async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        try
        {
            await PublishEvents(stoppingToken);
            var queueSize = await eventRepository.GetNumberOfEventsInQueue();
                           
            if (queueSize == 0)
                await Task.Delay(new TimeSpan(0,1,0), stoppingToken);
        }
        catch (Exception exception)
        {
            logger.LogError(exception, $"Error caught in {nameof(EventMessagingService)}.{nameof(ExecuteAsync)}");
            await Task.Delay(new TimeSpan(0,1,0), stoppingToken);
        }
    }
    logger.LogDebug($"{nameof(EventMessagingService)} background task is stopping");
}

async Task PublishEvents(CancellationToken stoppingToken)
{
    var items = await eventRepository.GetEvents(maxNumberOfEvents: 5000);
    
    if (!items.Any()) return;
    var (completed, failed) = await items.ParallelForEach(async (item) =>
    {
        PublishOptions publishOptions = new();
        publishOptions.SetHeader("SomePrefix.EventId", item.Id.ToString());
        
        await messageSession.Publish(item.ToRegisterValueDeletedEvent(), publishOptions));
    }, stoppingToken);

    if (completed.Any())
    {
  await eventRepository.DeleteEvent(completed.Select(x => x.Id)).ConfigureAwait(false));
    }
   return;
}    

}

public static class IEnumerableExtensions
{
public static async Task<(IEnumerable Completed, IEnumerable<(TInput Item, Exception Exception)> Failed)> ParallelForEach(this IEnumerable items, Func<TInput, Task> function, CancellationToken cancellationToken = default)
{
List tasks = new();

    var completed = new ConcurrentBag<TInput>();
    var failed = new ConcurrentBag<(TInput, Exception)>();
    foreach (var item in items)
    {
        var newTask = Task.Run(async () =>
        {
            try
            {
                await function.Invoke(item);
                completed.Add(item);
            }
            catch (Exception e)
            {
                failed.Add((item, e));
            }
        }, cancellationToken);

        tasks.Add(newTask);
    }
    await Task.WhenAll(tasks);
    return (completed, failed);
}

}

But you have no transaction scope around this at all? Does the repository in place create some sort of an ambient transaction?

No this is not wrapped in any transaction scope. The repository is an Oracle Database that we connect to using Devart.Data.Oracle and as far as I can tell it does not create any ambient transactions.

Hi Christian,

I have tried to reproduce it but couldn’t get to the bottom of it. I have tried to send various sizes of messages in multiple thousands of concurrent operations including trying to reproduce a state in which I’m sending out messages while having a transaction scope around while handling an incoming message.

Could it be that your premium instance had temporarily some issues? I’m aware that Microsoft is rolling out the new local storage feature withing their premium instances so I wonder if that might have caused the communication issue.

Can you reliably reproduce it on your end? FYI I’m not attempting to push the blame to somewhere else. It could very well be that we have introduced a problem into the code but I need some reliable way to reproduce it on our end to be able to properly triage it.

Pss: I have tried it both on a standard and premium namespace

Regards,
Daniel

Hi Daniel

It could have been some internal ASB issues. We haven’t observed the exceptions in a while. :smiley:

I will try to see if I can reproduce the error on our subscription and get back to you.

Best regards,
Christian

1 Like

Hi Christian,

I hope you had a good start into the new year!

Have you been able to reproduce it?

Regards,
Daniel

Hi Daniel

We have not been able to reproduce the exceptions and haven’t seen them in our production environment. Must have been a minor hiccup :slight_smile:

Cheers,
Christian

1 Like