Homegrown outbox (store and forward) for send-only endpoints

We’ve been using Szymon’s suggestion of using SQL Server transport + NServiceBus.Router to make database operations and message sending atomic in “send only” endpoints. Works like a charm, but the setup is a bit complex - multiple transports and error/poison queues.

Decided to make an attempt to mimic NServiceBus’ Outbox feature, but for send only endpoints. The first indication of this not being a great idea is that it probably would have already existed if it were straightforward to implement. Anyways, I wanted to get some opinions on whether or not this is worth pursuing.

I hacked together something that “works” (all my tests are green at least):

  • A Behavior<IOutgoingLogicalMessageContext> that looks for a transaction - either a System.Data.Common.DbTransaction passed in via Send/PublishOptions or an ambient System.Transactions.Transaction.
    • If transaction found, use PendingTransportOperations to capture the outgoing TransportOperations and store them in a outbox table (within the same transaction).
  • A FeatureStartupTask spawning a thread that polls the outbox table and uses IDispatchMessages to dispatch the TransportOperations directly to the transport before marking them as dispatched.

Any gotchas to this approach? Is it viable?

Hi Kato

Thanks for sharing your ideas here in the forum. Yes that is a viable approach definitely.

One drawback the feature startup task approach has though is once you start scaling out the endpoint with the feature enabled you run into competing consumer problems on the outbox table. Maybe making this a dedicated external worker process would make more sense.

Currently, you would also need to do a bit of hackery if you want to clear the pending transport operations in NServiceBus. We did something similar for the CosmosDB persister.

https://github.com/Particular/NServiceBus.Persistence.CosmosDB/blob/release-1.0/src/NServiceBus.Persistence.CosmosDB/Outbox/LogicalOutboxBehavior.cs#L27-L32

If you want, feel free to share the code with us, and we might be able to have a brief look at it and give you some more concrete inputs.

Regarding polling the table. There is an interesting approach that debezium uses with change data capturing in SQL Server.

Regards
Daniel

Thanks for feedback, Daniel! Much appreciated!

Yes, forgot to think about scaling. An external process definately makes most sense. However, it is more instrusive wrt. application deployment. Was really hoping some kind of database trickery could be a solution. Something that prevents the forwarder from starting in more than one endpoint. But that might be brittle.

Not sure if I have to clear the pending transport operations? I have assumed that PendingTransportOperations is only set for endpoints with a receive/incoming pipeline. Hence, I implemented the behavior like this:

var pendingTransportOperations = new PendingTransportOperations();

context.Extensions.Set(pendingTransportOperations);

await next().ConfigureAwait(false);

context.Extensions.Remove<PendingTransportOperations>();

var outboxTransportOperations = pendingTransportOperations.Operations
    .ToOutboxOperations();

await _outboxStorage
    .Store(
        new OutboxMessage(context.MessageId, outboxTransportOperations),
        transaction)
    .ConfigureAwait(false);

CDC look cool, but a little complex. Might look into it at a later stage. Thanks for the pointer, though :slightly_smiling_face:

Hi @KatoStoelen ,

We have released outbox support for ASP.NET Core scenarios with the transactional session package.

See for example this sample showing how to integrate SQL persistence with entity framework in ASP.NET Core

Regards,
Daniel

Great stuff!

Thanks for the heads up, @danielmarbach, will make sure to check it out.

Br,
Kato

Hi everyone,

We have updated the ASP.NET Core sample to show a better approach on how to deal with the transactional session integration into the ASP.NET Core pipeline.

The tweaked approach selectively opts in for transactional session by using parameter injection

[HttpGet]
public async Task<string> Get([FromServices] ITransactionalSession messageSession)

which makes sure the transactional session interaction only happens for routes that actually need it.

Regards,
Daniel