Outbox in an ASP.NET Core scenario

Ah, that’s really nice, thanks! I’ll give it a go.

@SzymonPobiega I want to implement the outbox pattern in my web api’s and was looking into the NServiceBus.Connector.SqlServer package. Please correct me if I am wrong, but I don’t believe this solution supports multi-tenancy (which is something that I require for my implementation). Are you able to suggest some solutions that would support multi-tenancy? I have considered callbacks and SendLocal, but they are both not ideal solutions. At this point, I am inclined to implement my own outbox table, but would love to hear alternate solutions

What type of multi-tenancy do you have? A shared queue/separate database?

We have a separate database per tenant.

I was able to leverage some of the built-in classes to write to the NServiceBus outbox table. I have a simple solution that contains this implementation here.

Could you please take a look whenever you have a chance and let me know if there are any issues with this approach? The main code files are UnitOfWorkFilter.cs, WebHostedOutboundMessageBehavior.cs and PendingOperationsDispatcher.cs

I also intend to have a separate process to clean up the Outbox table as well as to reprocess any dispatch failures.

Hi @adeepak12

I’ve modified the way the SqlConnection is created in the connector sample (the one included in the repo) and it seems to work. Here’s the new code:

serviceCollection.AddScoped((serviceProvider) =>
{
    var httpContext = serviceProvider.GetService<IHttpContextAccessor>().HttpContext;
    if (httpContext.Request.Query
        .TryGetValue("tenant", out var tenant))
    {
        return new SqlConnection(connectionString.Replace("initial catalog=connector", $"initial catalog=connector-{tenant}"));
    }
    return new SqlConnection(connectionString);
});

for simplicity reasons it just takes the tenant ID from the query string and uses it to build the connection string. Each tenant has its own catalog within the same instance of SQL Server. The router built into the connector is configured to use a shared catalog for messages so what ends up happening when handling a request is a single transaction that includes storing data in the tenant catalog and sending a message to a queue table in a shared catalog. Because SQL Server supports atomic transactions that include multiple catalogs, it works fine.

What is not possible is having tenant catalogs in different SQL Server instances. You could overcome that by including a concept of sharding (multiple shards, each shard contains many tenants, each shard uses a different SQL Server instance). That would require running a separate instance of router for each shard. The Connector could be easily extended to allow for passing multiple connection strings and spinning up a router for each connection string.

Hope it helps,
Szymon

1 Like

Thank you @SzymonPobiega for sharing this! Our tenant databases are across different SQL Server instances. I will look into the sharding approach and running a separate instance of router per shard.

How may SQL Server do you have? Does each tenant have a separate instance?

Currently, we have 3 tenants each having a dedicated sql server instance. We will likely be migrating over to SQL Azure at some point down the road.

@adeepak12 for a small number of tenant databases (<10) the connector could be modified to handle multiple databases. That should be an easy change. For a larger number of tenant databases that would probably not be a good solution because each tenant DB has to be polled for new messages. The number of SQL queries issued per second would grow with each tenant regardless if that tenant is busy or not.

An alternative that would work with SQL Azure is Elastic Transaction. You can have one shared DB for the messages and each tenant would have a separate data DB. If these DBs are linked via ET, you would be able to modify the data and send a message in a single TransactionScope. For that to work the Connector would have to support the TransactionScope mode (another small change).

@SzymonPobiega Agreed, I don’t think this will scale beyond a small number of tenant db’s. At this point, I am inclined towards writing to the local outbox table. Thank you for your suggestions, I really appreciate it!

Is there support for a MongoDB transport so that an equivalent connector would be possible?

Hi @Mark_Phillips

Unfortunately no. The SQL connector works because there is SQL Server transport that allows connection/transaction sharing. Because there is no equivalent MongoDB transport, in order to achieve the same thing with MongoDB, you would have to use a concept similar to the Outbox.

Currently NServiceBus built-in Outbox can be used only in the context of processing a message, not when you have a HTTP request and want to store some data and send a message. To do something like this you would have to store your outgoing message yourself and then have some sort of background worker that would push them to your transport.

Okay. Thanks for the clarification.

Hi @SzymonPobiega I was going through the code of the NServiceBus.Connector.SqlServer and I’m wondering what is the purpose of the explicit TransportTransactionMode.SendsAtomicWithReceive setting?

As far as I remember this was to prevent Distributed Transaction when the “other transport” can attach to the transaction scope (e.g. another SQL Server or MSMQ). It also prevents Azure ServiceBus transport from complaining about TransactionScope being present on the thread.

Szymon

Thanks, I’ve tried to configure the connector in my project but it didn’t fit my architecture very well. The problem is that I have 2 endpoints inside a single web app, one is a send only endpoint I use from the controllers, the other is a worker endpoint that is hosting various handlers and both endpoints tend to publish the same event types.
After some trials and errors I’ve ended up implementing my own connector that works in a similar way but instead of a “full” sql endpoint + a router I’ve done it in a slightly different way: The web is using a send only endpoint with custom routing setup that makes every send/publish a unicast operation to the sql queue. A raw endpoint is reading from that queue and forwarding the messages to the correct destination.
Here is the code with a sample: GitHub - peto268/NServiceBus.WebOutbox

2 Likes

I achieved a similar solution using Hangfire with SqlServer storage. Some snippets:

Create a MessageSession implementation which enqueues a job when a message should be sent:

public class HangfireOutboxMessageSession : IMessageSession
{
    private readonly IBackgroundJobClient _hangfireJobClient;

    public HangfireOutboxMessageSession(IBackgroundJobClient hangfireJobClient)
    {
        _hangfireJobClient = hangfireJobClient;
    }

    public Task Send(object message, SendOptions options)
    {
        _hangfireJobClient.Enqueue<IMessageSession>(
            messageSession => messageSession.Send(message, options));

        return Task.CompletedTask;
    }

    public Task Send<T>(Action<T> messageConstructor, SendOptions options)
    {
        _hangfireJobClient.Enqueue<IMessageSession>(
            messageSession => messageSession.Send<T>(messageConstructor, options));

        return Task.CompletedTask;
    }

    public Task Publish(object message, PublishOptions options)
    {
        _hangfireJobClient.Enqueue<IMessageSession>(
            messageSession => messageSession.Publish(message, options));

        return Task.CompletedTask;
    }

    public Task Publish<T>(Action<T> messageConstructor, PublishOptions publishOptions)
    {
        _hangfireJobClient.Enqueue<IMessageSession>(
            messageSession => messageSession.Publish<T>(messageConstructor, publishOptions));

        return Task.CompletedTask;
    }

    public Task Subscribe(Type eventType, SubscribeOptions options) => throw new NotImplementedException("Hangfire Outbox cannot subscribe Messages!");

    public Task Unsubscribe(Type eventType, UnsubscribeOptions options) => throw new NotImplementedException("Hangfire Outbox cannot subscribe Messages!");
}

Change the IMessageSession DI Registration so that when a HttpContext is available, we use HangfireMessageSession, otherwise, we use NServiceBus Message Session. Also the SqlConnection should be shared between your DbContext and Hangfire, and there should be a TransactionScope over the whole thing (Unit of Work).

public static IServiceCollection AddHangfireOutboxAdapter(
    this IServiceCollection services,
    IConfiguration configuration)
{

    var nsbMessageSessionImplementationFactory = services
        .FirstOrDefault(d => d.ServiceType == typeof(IMessageSession))?
        .ImplementationFactory;

    if (nsbMessageSessionImplementationFactory == null)
        throw new Exception("Hangfire Outbox must be registered after NServiceBus!");

    services.Replace(new ServiceDescriptor(typeof(IBackgroundJobClient), sp =>
        new BackgroundJobClient(new SqlServerStorage(
            sp.GetRequiredService<WriteModelDbContext>().Database.GetDbConnection(),
            sp.GetRequiredService<SqlServerStorageOptions>())),
        ServiceLifetime.Scoped));

    services.TryAddSingleton<IHttpContextAccessor, HttpContextAccessor>();

    services.Replace(new ServiceDescriptor(typeof(IMessageSession), sp =>
    {
        if (sp.GetService<IHttpContextAccessor>()?.HttpContext == null)
            return nsbMessageSessionImplementationFactory(sp);

        return new HangfireOutboxMessageSession(
            sp.GetRequiredService<IBackgroundJobClient>());
    }, ServiceLifetime.Scoped));

    return services;
}

Hi everyone,

We have released outbox support for ASP.NET Core scenarios with the transactional session package.

See for example this sample showing how to integrate SQL persistence with entity framework in ASP.NET Core

Regards,
Daniel

3 Likes

Hi everyone,

We have updated the ASP.NET Core sample to show a better approach on how to deal with the transactional session integration into the ASP.NET Core pipeline.

The tweaked approach selectively opts in for transactional session by using parameter injection

[HttpGet]
public async Task<string> Get([FromServices] ITransactionalSession messageSession)

which makes sure the transactional session interaction only happens for routes that actually need it.

Regards,
Daniel