I have set up a single event (and a couple commands used in a saga). I have 2 web apps and 1 console app that register endpoints. Autosubscribe is turned off for all of them. ONe app declares a single handler for that Event and manually subscribes to the Event.
When I run the apps in containers and use sqlPersistence and RabbitMq transport the handler is called over and over again. When I run without containers and don’t explicitly set any persistence and use the LearningTransport, the code behaves as expected.
I do know that the handler will get called over and over again when an exception happens but that is up to the default number of retries (5?), and there is no exceptions happening in the handler so I don’t think that is the problem.
When an event is published (same issue when send a command to its handler) from the web app, the handler is invoked over and over again for the same message: The message Id, correlation id, conversation id, time sent,… everything is the same and I don’t know why and no other hits on internet are found.
I am using autofac as the container and NSB 7.0.1 and RabbitMq for the transport. All apps and persistence (sql server) and transport run in Docker containers.
For each app I set it up the same except I register the handler in only one, as seen below:
Thanks for any help!
public virtual IServiceProvider ConfigureServices(IServiceCollection services)
{
//...
var loggerFactory =
container.Resolve<Microsoft.Extensions.Logging.ILoggerFactory>();
var logFactory = LogManager.Use<MicrosoftLogFactory>();
logFactory.UseMsFactory(loggerFactory);
var endpointConfiguration = new EndpointConfiguration(applicationName);
endpointConfiguration.EnableInstallers();
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
endpointConfiguration.DisableFeature<AutoSubscribe>();
EnsureSqlDatabaseExists(persistenceConnectionString, container.Resolve<ILogger<IEndpointInstance>>());
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.TablePrefix("Monrovo");
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
dialect.Schema("receiver");
persistence.ConnectionBuilder(() => new SqlConnection(persistenceConnectionString));
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseConventionalRoutingTopology();
transport.ConnectionString(transportConnectionString);
endpointConfiguration.UseContainer<AutofacBuilder>(customizations =>
{
customizations.ExistingLifetimeScope(container);
});
await ScriptRunner.Install(
sqlDialect: new SqlDialect.MsSqlServer(),
tablePrefix: "Monrovo",
connectionBuilder: () => new SqlConnection(persistenceConnectionString),
scriptDirectory: Path.GetDirectoryName(Assembly.GetEntryAssembly().Location) + @"/NServiceBus.Persistence.Sql/MsSqlServer",
shouldInstallOutbox: true,
shouldInstallSagas: true,
shouldInstallSubscriptions: true,
shouldInstallTimeouts: true);
var startableEndpoint = await Endpoint.Create(endpointConfiguration);
var endpoint = await startableEndpoint.Start();
_endpointInstance.Subscribe<ArticleApproved>().Wait();
var updateBuilder = new ContainerBuilder();
updateBuilder.RegisterInstance(_endpointInstance)
.As<IMessageSession>().As<IEndpointInstance>().SingleInstance();
// There is currently no workaround from NServiceBus for this.
// See https://github.com/Particular/NServiceBus/issues/4421
#pragma warning disable CS0618 // Type or member is obsolete
updateBuilder.Update(container);
#pragma warning restore CS0618 // Type or member is obsolete
return new AutofacServiceProvider(container);
}
private static void EnsureSqlDatabaseExists(string connectionString, ILogger<IEndpointInstance> logger)
{
var builder = new SqlConnectionStringBuilder(connectionString);
var originalCatalog = builder.InitialCatalog;
builder.InitialCatalog = "master";
var masterConnectionString = builder.ConnectionString;
try
{
using (var connection = new SqlConnection(masterConnectionString))
{
connection.Open();
var command = connection.CreateCommand();
command.CommandText =
$"IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{originalCatalog}')" +
$" CREATE DATABASE [{originalCatalog}]; ";
command.ExecuteNonQuery();
}
}
catch (SqlException)
{
// Need to handle this better. Locally, we need to use master first. In Azure this is not possible or necessary
// TODO: Build a sql docker container with a database already created.
builder.InitialCatalog = originalCatalog;
masterConnectionString = builder.ConnectionString;
try
{
using (var connection = new SqlConnection(masterConnectionString))
{
connection.Open();
var command = connection.CreateCommand();
command.CommandText =
$"IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{originalCatalog}')" +
$" CREATE DATABASE [{originalCatalog}]; ";
command.ExecuteNonQuery();
}
}
catch (SqlException innerException)
{
logger.LogCritical(innerException, $"Unable to connect to SQL Server. Check that {builder.DataSource} is available");
}
}
builder.InitialCatalog = originalCatalog;
masterConnectionString = builder.ConnectionString;
try
{
using (var connection = new SqlConnection(masterConnectionString))
{
connection.Open();
var command = connection.CreateCommand();
command.CommandText =
$"IF NOT EXISTS(SELECT 1 FROM sys.schemas WHERE name = 'receiver')" +
$"BEGIN EXEC sys.sp_executesql N'CREATE SCHEMA receiver;' END";
command.ExecuteNonQuery();
}
}
catch { }
}
public class test : IHandleMessages<ArticleApproved>
{
public Task Handle(RfpApproved message, IMessageHandlerContext context)
{
// Initially handles the published event and starts the saga
context.SendLocal(new StartRFPSync(message.RfpId));
return Task.CompletedTask;
}
}