Hi @Mark_Phillips
Unfortunately no. The SQL connector works because there is SQL Server transport that allows connection/transaction sharing. Because there is no equivalent MongoDB transport, in order to achieve the same thing with MongoDB, you would have to use a concept similar to the Outbox.
Currently NServiceBus built-in Outbox can be used only in the context of processing a message, not when you have a HTTP request and want to store some data and send a message. To do something like this you would have to store your outgoing message yourself and then have some sort of background worker that would push them to your transport.
Okay. Thanks for the clarification.
Hi @SzymonPobiega I was going through the code of the NServiceBus.Connector.SqlServer and I’m wondering what is the purpose of the explicit TransportTransactionMode.SendsAtomicWithReceive setting?
As far as I remember this was to prevent Distributed Transaction when the “other transport” can attach to the transaction scope (e.g. another SQL Server or MSMQ). It also prevents Azure ServiceBus transport from complaining about TransactionScope being present on the thread.
Szymon
Thanks, I’ve tried to configure the connector in my project but it didn’t fit my architecture very well. The problem is that I have 2 endpoints inside a single web app, one is a send only endpoint I use from the controllers, the other is a worker endpoint that is hosting various handlers and both endpoints tend to publish the same event types.
After some trials and errors I’ve ended up implementing my own connector that works in a similar way but instead of a “full” sql endpoint + a router I’ve done it in a slightly different way: The web is using a send only endpoint with custom routing setup that makes every send/publish a unicast operation to the sql queue. A raw endpoint is reading from that queue and forwarding the messages to the correct destination.
Here is the code with a sample: GitHub - peto268/NServiceBus.WebOutbox
1 Like
I achieved a similar solution using Hangfire with SqlServer storage. Some snippets:
Create a MessageSession implementation which enqueues a job when a message should be sent:
public class HangfireOutboxMessageSession : IMessageSession
{
private readonly IBackgroundJobClient _hangfireJobClient;
public HangfireOutboxMessageSession(IBackgroundJobClient hangfireJobClient)
{
_hangfireJobClient = hangfireJobClient;
}
public Task Send(object message, SendOptions options)
{
_hangfireJobClient.Enqueue<IMessageSession>(
messageSession => messageSession.Send(message, options));
return Task.CompletedTask;
}
public Task Send<T>(Action<T> messageConstructor, SendOptions options)
{
_hangfireJobClient.Enqueue<IMessageSession>(
messageSession => messageSession.Send<T>(messageConstructor, options));
return Task.CompletedTask;
}
public Task Publish(object message, PublishOptions options)
{
_hangfireJobClient.Enqueue<IMessageSession>(
messageSession => messageSession.Publish(message, options));
return Task.CompletedTask;
}
public Task Publish<T>(Action<T> messageConstructor, PublishOptions publishOptions)
{
_hangfireJobClient.Enqueue<IMessageSession>(
messageSession => messageSession.Publish<T>(messageConstructor, publishOptions));
return Task.CompletedTask;
}
public Task Subscribe(Type eventType, SubscribeOptions options) => throw new NotImplementedException("Hangfire Outbox cannot subscribe Messages!");
public Task Unsubscribe(Type eventType, UnsubscribeOptions options) => throw new NotImplementedException("Hangfire Outbox cannot subscribe Messages!");
}
Change the IMessageSession DI Registration so that when a HttpContext is available, we use HangfireMessageSession, otherwise, we use NServiceBus Message Session. Also the SqlConnection should be shared between your DbContext and Hangfire, and there should be a TransactionScope over the whole thing (Unit of Work).
public static IServiceCollection AddHangfireOutboxAdapter(
this IServiceCollection services,
IConfiguration configuration)
{
var nsbMessageSessionImplementationFactory = services
.FirstOrDefault(d => d.ServiceType == typeof(IMessageSession))?
.ImplementationFactory;
if (nsbMessageSessionImplementationFactory == null)
throw new Exception("Hangfire Outbox must be registered after NServiceBus!");
services.Replace(new ServiceDescriptor(typeof(IBackgroundJobClient), sp =>
new BackgroundJobClient(new SqlServerStorage(
sp.GetRequiredService<WriteModelDbContext>().Database.GetDbConnection(),
sp.GetRequiredService<SqlServerStorageOptions>())),
ServiceLifetime.Scoped));
services.TryAddSingleton<IHttpContextAccessor, HttpContextAccessor>();
services.Replace(new ServiceDescriptor(typeof(IMessageSession), sp =>
{
if (sp.GetService<IHttpContextAccessor>()?.HttpContext == null)
return nsbMessageSessionImplementationFactory(sp);
return new HangfireOutboxMessageSession(
sp.GetRequiredService<IBackgroundJobClient>());
}, ServiceLifetime.Scoped));
return services;
}
Hi everyone,
We have updated the ASP.NET Core sample to show a better approach on how to deal with the transactional session integration into the ASP.NET Core pipeline.
The tweaked approach selectively opts in for transactional session by using parameter injection
[HttpGet]
public async Task<string> Get([FromServices] ITransactionalSession messageSession)
which makes sure the transactional session interaction only happens for routes that actually need it.
Regards,
Daniel