DelayDeliveryWith Sending same message multiple time to the handler

There is an issue while processing the delayed messages using MSMQ Transport. When any message send with option DelayDeliveryWith causing an issue appearing the same command multiple times.

PlaceOrderNew => When we send this command with delay then appearing the same message multiple’s time.

Use SQL Db as Datastore.

Please find the sample code as below

Configuration:
static async Task Main()
{
Console.Title = “Samples.DelayedDelivery.Client”;
var endpointConfiguration = new EndpointConfiguration(“Samples.DelayedDelivery.Client”);

    var hibernateConfig = new NHibernate.Cfg.Configuration();
    hibernateConfig.DataBaseIntegration(x =>
    {
        x.ConnectionStringName = "NServiceBus/Persistence";
        x.Dialect<MsSql2012Dialect>();
    });
    hibernateConfig.SetProperty("default_schema", "dbo");
    endpointConfiguration.UsePersistence<NHibernatePersistence>().UseConfiguration(hibernateConfig); ;

    string errorQueue = "error";
    endpointConfiguration.SendFailedMessagesTo(errorQueue);
    endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();

    var messageStore = new SqlServerDelayedMessageStore(
                connectionString: ConfigurationManager.ConnectionStrings["NServiceBus/Persistence"].ConnectionString,
                schema: "dbo", tableName: "DelayMessageStoreTemp"); //optional, defaults to endpoint name with '.delayed' suffix
    var transport = new MsmqTransport
    {
        DelayedDelivery = new DelayedDeliverySettings(messageStore)
        {
            NumberOfRetries = 1,
            MaximumRecoveryFailuresPerSecond = 2,
            TimeToTriggerStoreCircuitBreaker = TimeSpan.FromSeconds(30),
            TimeToTriggerDispatchCircuitBreaker = TimeSpan.FromSeconds(30),
            TimeToTriggerFetchCircuitBreaker = TimeSpan.FromSeconds(30)
        }
    };
    var routing = endpointConfiguration.UseTransport(transport);
    endpointConfiguration.EnableInstallers();

    var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

    await SendOrder(endpointInstance).ConfigureAwait(false);

    await endpointInstance.Stop().ConfigureAwait(false);
}

static async Task SendOrder(IEndpointInstance endpointInstance)
{
    Console.WriteLine("Press '1' to send PlaceOrder - defer message handling");
    Console.WriteLine("Press '2' to send PlaceDelayedOrder - defer message delivery");
    Console.WriteLine("Press any other key to exit");

    while (true)
    {
        var key = Console.ReadKey();
        Console.WriteLine();
        var id = Guid.NewGuid();


        switch (key.Key)
        {
            case ConsoleKey.D1:
            case ConsoleKey.NumPad1:
                #region SendOrder
                var placeOrder = new PlaceOrder
                {
                    Product = "New shoes",
                    Id = id
                };
                await endpointInstance.Send("Samples.DelayedDelivery.Server", placeOrder)
                    .ConfigureAwait(false);
                Console.WriteLine($"[Defer Message Handling] Sent a PlaceOrder message with id: {id.ToString("N")}");
                #endregion
                continue;
            case ConsoleKey.D2:
            case ConsoleKey.NumPad2:
                #region DeferOrder
                var placeDelayedOrder = new PlaceDelayedOrder
                {
                    Product = "New shoes",
                    Id = id
                };
                var options = new SendOptions();

                options.SetDestination("Samples.DelayedDelivery.Server");
                options.DelayDeliveryWith(TimeSpan.FromSeconds(5));
                await endpointInstance.Send(placeDelayedOrder, options)
                    .ConfigureAwait(false);
                Console.WriteLine($"[Defer Message Delivery] Deferred a PlaceDelayedOrder message with id: {id.ToString("N")}");
                #endregion
                continue;
            default:
                return;
        }
    }

PlaceOrderHandler.cs

public async Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
    using (new System.Transactions.TransactionScope(System.Transactions.TransactionScopeOption.Suppress))
    {
        if (ShouldMessageBeDelayed(message.Id))
        {
            var options = new SendOptions();

            options.DelayDeliveryWith(TimeSpan.FromSeconds(5));
            options.RouteToThisEndpoint();
            await context.Send(new PlaceOrderNew() { Id = message.Id, Product = message.Product }, options)
                .ConfigureAwait(false);
            log.Info($"[Defer Message Handling] Deferring Message with Id: {message.Id}");
            return;
        }

        log.Info($"[Defer Message Handling] Order for Product:{message.Product} placed with id: {message.Id}");
    }
}

public Task Handle(PlaceOrderNew message, IMessageHandlerContext context)
{
    using (new System.Transactions.TransactionScope(System.Transactions.TransactionScopeOption.Suppress))
    {
        log.Info($"[Defer Message Handling] New Mewssagsldkfjsdl Message with Id: {message.Id}");
        return Task.CompletedTask;
    }
}

The transport transaction scope is using DTC to coordinate the receive operation and the send operation (as well as any other action you have in your handler that can participate).

This normally means if you have a retry for any reason both the received message consumption and outgoing message send operations will be rolled back.

When you suppress scope as you have done, the send will be dispatched immediately in a separate transaction and if the incoming message is rolled back for any reason (for instance a retry) this will result in duplicate messages.

Is there a reason you are suppressing transaction scope in these handlers?

EDIT: While the previous is true, it appears I may have spoke to soon about immediate dispatch. I still suspect the transaction scope is to blame.

Hi Bob,

Even after removing the Transaction Scope the behavior remain same.

The first line of your screenshot suggests that you send a PlaceDelayedOrder message, but there is no handler that handles this message in your sample code, so we have no idea what is happening in there that could explain the appearance of that same message multiple times.

Thanks for your reply. Nothing specific but just added log with information.

public class PlaceDelayedOrderHandler :
IHandleMessages
{
static ILog log = LogManager.GetLogger();

public Task Handle(PlaceDelayedOrder message, IMessageHandlerContext context)
{
    log.Info($"[Defer Message Delivery] Order for Product:{message.Product} placed with id: {message.Id}");
    return Task.CompletedTask;
}

}

Any clue how to resolve this as this is working as expected in NServicebus 7.x but not working as expected in NServicebus 8.x.

1 Like