Sending messages with a custom TimeToBeReceived is not supported on transactional MSMQ

I use separate endpoint for publishing events, but in doing so I’ve hit an impasse.

I use this method to create an endpoint:

public static async Task<IEndpointInstance> CreateMsmqBackendPublishEndpointInstance_(NServiceBusBackendPublishEndpointTypeEnum endpointType, Func<ICriticalErrorContext, Task> onCriticalError)
{
  var endpointConfiguration = new EndpointConfiguration(endpointType.ToString());

  var conventions = endpointConfiguration.Conventions();
  conventions.DefiningTimeToBeReceivedAs(type => TimeSpan.FromMinutes(5));

  endpointConfiguration.DefineCriticalErrorAction(onCriticalError);
  endpointConfiguration.EnableInstallers();
  endpointConfiguration.MakeInstanceUniquelyAddressable(endpointType.ToString());
    
  var recoverability = endpointConfiguration.Recoverability();
  recoverability.Delayed(d => { d.NumberOfRetries(0); });
  recoverability.Immediate(i => { i.NumberOfRetries(0); });

  endpointConfiguration.SendFailedMessagesTo($"{endpointType.ToString()}.Errors");
  endpointConfiguration.UsePersistence<InMemoryPersistence, StorageType.Timeouts>();
  endpointConfiguration.UsePersistence<MsmqPersistence, StorageType.Subscriptions>()
                       .SubscriptionQueue($"{endpointType.ToString()}.Subscriptions");

  endpointConfiguration.UseSerialization<NewtonsoftSerializer>();

  var transport = endpointConfiguration.UseTransport<MsmqTransport>();
  transport.Transactions(TransportTransactionMode.None);
  transport.UseNonTransactionalQueues();

  return await Endpoint.Start(endpointConfiguration);
}

Then in my handler for an upsert to the database I use this code:

public async Task Handle(ContactUpsertRequest message, IMessageHandlerContext context)
{
  log.Debug($"Upserting: {message.Contact}");

  try
  {
    IContactRepository repository = new ContactRepository();
    var response = repository.Upsert(message.Contact);
    if (response != null)
    {
      log.Debug($"Upserting: {message.Contact} succeded");
      if (message.Contact.CompareTo(response) != 0)
      {
        await Program.Host.EndpointPublish.Publish(new ContactUpsertSuccessEvent(response));
      }

      await context.Reply(new ContactUpsertSuccessResponse(response));

      return;
    }

    var exceptionMessage = $"Upserting: {message.Contact} failed in repository";
    var exception = new Exception(exceptionMessage);
    log.Error(exceptionMessage, exception);
  }
  catch (Exception ex)
  {
    var exceptionMessage = $"Upserting: {message.Contact} failed";
    var exception = new Exception(exceptionMessage, ex);
    log.Error(exceptionMessage, exception);
  }
}

This part fails:

      if (message.Contact.CompareTo(response) != 0)
      {
        await Program.Host.EndpointPublish.Publish(new ContactUpsertSuccessEvent(response));
      }

When trying to use the publish endpoint within the transaction of the reply endpoint I the error. I’ve tried to set the transport options on the event service, but since the transaction is for the reply service, that doesn’t work (obviously, but had to try). Also tried the following in the handler:

      if (message.Contact.CompareTo(response) != 0)
      {
        using (new TransactionScope(TransactionScopeOption.Suppress))
        {
          await Program.Host.EndpointPublish.Publish(new ContactUpsertSuccessEvent(response));
        }
      }

Trying to “suppress” the transaction, but same result. :frowning:

Anyone have any ideas to help?

Hi

Why do you use a separate endpoint for publishing instead of just using the message processing context?

Also, using InMemoryPersistence for timeouts is a really dangerous idea. The message that fail and go through delayed retries are stored as timeouts. If the timeout storage is in-memory and the endpoint process crashes, all these delayed retry messages are lost.

Szymon

The reason for the separate endpoint is that if I publish a lot of updates to for instance my master data endpoint, like when integration imports /updates items, it blocks requests from the GUI like asking for data from set endpoint.
So I tried splitting them up so I can publish updates and make requests at the same time. I would prefer one endpoint of course, but then I would like to be able to set a priority on messages to choose what messages needs to be handled first.

The InMemoryPersistence is taken directly from the webpage: MSMQ Subscription Persistence • MSMQ Persistence • Particular Docs

The separate endpoint for publish however is not the issue here. The issue is how to set the TTBR on an event. I get the same issue if using just one endpoint. I can publish fine as long as it is not a part of a handler or rather the handlers transaction

Hi

The outgoing messages (both publishes and replies) go directly to the target queue so it does not really matter what endpoint does the sending or publishing. Given three endpoints A, B and C, if A sends a message to B and B replies to A and also publishes a message that C is subscribed to, the reply gets directly to queue A and the published event gets to queue C. The endpoint B and its queue is not in any way blocked or involved in the sending/publishing process other than passing the message to the local instance of MSMQ.

So in other words, there is no performance difference here. Publishing using a separate endpoint is discouraged because it requires additional resources (to host that endpoint in memory) and also prevents passing the correlation and causation context from the message being processed to the even being published.

Hope it makes sense,
Szymon

So I guess the lag I’ve seen when publish events post hundreds/thousand of messages is more on the backend than on the service

So back to the original question. How can I set the TTBR on my published messages? Every time I get into a handlers transaction it fails…

The MSMQ transport has a bug that causes all messages within a single transaction to use the TTBR value of the first message send in that transaction. The issue is described here.

What you can do is send your TTBR’ed message using dispatch consistency Isolated. This way you are telling NServiceBus to essentially send that message outside of the current transaction. The downside of this approach is that the message is going to be sent out even if the handler eventually fails.

var options = new SendOptions();
options.RequireImmediateDispatch();
await session.Send(new MyMessage(), options);

Is it OK in your system for these messages to be delivered even if the thing that they signalled didn’t actually happen?

I think that would work as the are in an if statement and only gets sent if the action is successful. Furthermore I will push the sending to the end of the handler so everything that can fail should have been handled at that time…

Thank you for the suggested solution. I will give it a go.

Furthermore I will push the sending to the end of the handler so everything that can fail should have been handled at that time…

There is still a possibility (although very unlikely with MSMQ) that the handling fails when MSMQ is acknowledging reception of a message. In that case a message will be retried later while your outgoing message has already been sent out.

I thought of that, but I think for now it will be okay. Maybe I’ll reevaluate if an issue arrises… :slight_smile: