How to correctly configure a multiple queue scenario and have DI still work

Hello, I’m trying to configure a somewhat complex scenario and having trouble with the dependency injection with my IHandleMessages<> handlers.

The scenario is this. We have a highly scaled service that defines a saga and we needed to put a singleton throttler in front of the saga creation because we were having millions of sagas created in quick succession and stair-stepping their way through the process (ie. millions going through step 1, then millions going through step 2 etc. and first saga not completing process for hours). By throttling the rate of saga start, the intention was that sagas would have a chance to complete before the next set would start and so forth so we’d get a steady rate of completion instead of nothing for several hours. So to get this working we basically have a scenario with 2 separate projects; the main service defines an nsb queue & topic that it uses for the saga, but the saga start message is published from the throttler service. The way we configured this on the throttler is we defined an endpoint for the throttler with its own queue/topic name (because we didn’t want it consuming all the saga messages from the main service!) but then used transport.Routing().RouteToEndpoint(typeof(sagastart), MainServiceQueueName)
to route the NSB messages to the queue owned by the main service; the endpoint configuration itself was using the throttler’s queue and topic which at the time we built this didn’t actually have any traffic. This scenario is working as designed.

Fast forward to now. We need to use this same singleton throttler application to take over one specific step in the saga that isn’t behaving well in the massively multithreaded environment so we can take advantage of batching/buffering. The details of what it’s doing aren’t really part of this, because right now it’s not doing anything – I’m just trying to get the plumbing together where i can consume a specific ICommand on the throttler’s queue that will eventually be sent by the main service saga (but using a test service inside the throttler to send right now for dev purposes). So what the desired flow will be is :
a) throttler starts saga on main service’s queue
b) main service does a whole lot of stuff then sends a specific command to throttler’s queue
c) throttler handles the command then publishes to main service’s topic a success/failure event (which gets handled by same saga handler that currently handles it today). So basically, I’m injecting an “external” step into the middle of the saga.

But what is actually happening when I build out the plumbing in the throttler is that my IHandleMessages handler can’t resolve any dependencies (except the Serilog logger), but non-NSB classes can (and it doesn’t matter if the dependency is a super simple class that does nothing or whether it has dependencies of its own – the ITest in the example below is a trivial interface with trivial implementation that I used to rule out missing dependencies in the ‘real’ classes I’m injecting):

[Warning][2021/06/29 03:04:22.976][Thread 27][Delayed Retry will reschedule message ‘278eed4a-5dee-4f1f-bce8-ad56014ac1f6’ after a delay of 00:00:05 because of an exception:]
System.InvalidOperationException: Unable to resolve service for type ‘Signify.Matchmaker.Streams.Cache.ITest’ while attempting to activate ‘Signify.Matchmaker.Streams.NSB.PublishAssignmentsHandler’.
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteFactory.CreateArgumentCallSites(Type serviceType, Type implementationType, CallSiteChain callSiteChain, ParameterInfo[] parameters, Boolean throwIfCallSiteNotFound)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteFactory.CreateConstructorCallSite(ResultCache lifetime, Type serviceType, Type implementationType, CallSiteChain callSiteChain)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteFactory.TryCreateExact(ServiceDescriptor descriptor, Type serviceType, CallSiteChain callSiteChain, Int32 slot)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteFactory.TryCreateExact(Type serviceType, CallSiteChain callSiteChain)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteFactory.CreateCallSite(Type serviceType, CallSiteChain callSiteChain)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteFactory.<>c__DisplayClass7_0.b__0(Type type)
at System.Collections.Concurrent.ConcurrentDictionary2.GetOrAdd(TKey key, Func2 valueFactory)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteFactory.GetCallSite(Type serviceType, CallSiteChain callSiteChain)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.CreateServiceAccessor(Type serviceType)
at System.Collections.Concurrent.ConcurrentDictionary2.GetOrAdd(TKey key, Func2 valueFactory)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at NServiceBus.Extensions.DependencyInjection.ServiceProviderAdapter.ChildScopeAdapter.Build(Type typeToBuild)
at NServiceBus.Extensions.DependencyInjection.ContainerAdapter1.ChildContainerAdapter.Build(Type typeToBuild) at NServiceBus.CommonObjectBuilder.Build(Type typeToBuild) at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func2 stage) in //src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 45
at CurrentSessionBehavior.Invoke(IIncomingLogicalMessageContext context, Func1 next) in /_/src/SqlPersistence/SynchronizedStorage/CurrentSessionBehavior.cs:line 18 at NServiceBus.ScheduledTaskHandlingBehavior.Invoke(IIncomingLogicalMessageContext context, Func2 next) in /
/src/NServiceBus.Core/Scheduling/ScheduledTaskHandlingBehavior.cs:line 22
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 33 at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 37 at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 35 at NServiceBus.Transport.AzureServiceBus.MessagePump.InnerProcessMessage(Task1 receiveTask)

The following is what my NSB configuration currently looks like (note that I only actually have one endpoint explicitly being created but perhaps I need two given that there are multiple queues & topics? I’m not sure.). It’s sitting in a servicecollectionextensions class referenced by program.cs:

var nsbConfig = new ServiceBusConfig();
config.GetSection("ServiceBus").Bind(nsbConfig);

var endpointName = nsbConfig.QueueName; //This is the throttler's queue name

var transportConnectionString = config.GetConnectionString("AzureServiceBus");
var connectionString = config.GetConnectionString("MatchmakerDB");

var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();


// Transport
var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
transport.ConnectionString(transportConnectionString);
transport.Routing().RouteToEndpoint(typeof(StartMemberProviderSaga), 
nsbConfig.MatchmakerServiceQueueName); //Note this is the main service's queue name, and an ICommand
transport.Routing().RouteToEndpoint(typeof(AssignmentsPublished), 
 nsbConfig.MatchmakerServiceTopicName);  //Note this is main service's topic name and an IEvent -- will be the message sent back to the saga after successful completion of the PublishAssignmentsHandler.

// If this is not specified, NSB uses default "bundle-1" topic name
transport.TopicName(nsbConfig.TopicName); // This is throttler's topic name, currently nothing publishes to it.

// Errors and recovery
endpointConfiguration.SendFailedMessagesTo($"{endpointName}.error");
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(
    immediate =>
    {
        immediate.NumberOfRetries(nsbConfig.ImmediateRetryCount);
    });

recoverability.Delayed(
    delayed =>
    {
 delayed.NumberOfRetries(nsbConfig.DelayedRetryCount).TimeIncrease(TimeSpan.FromSeconds(nsbConfig.DelayedRetrySecondsIncrease));
});

// Persistence
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var dialect = persistence.SqlDialect<SqlDialect.PostgreSql>();
dialect.JsonBParameterModifier(
    parameter =>
    {
        var npgsqlParameter = (NpgsqlParameter) parameter;
        npgsqlParameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
    });

persistence.ConnectionBuilder(
    connectionBuilder: () => new NpgsqlConnection(connectionString));
    persistence.SubscriptionSettings().CacheFor(
        TimeSpan.FromMinutes(nsbConfig.PersistenceCacheMinutes));

endpointConfiguration.EnableInstallers();

var containerSettings = endpointConfiguration.UseContainer(new DefaultServiceProviderFactory());
containerSettings.ServiceCollection.Add(services); 

NServiceBus.Logging.LogManager.Use<SerilogFactory>();

var endpoint = Endpoint.Start(endpointConfiguration).GetAwaiter().GetResult();
services.AddSingleton(endpoint);

Any help you can provide in how to best manage the configuration for scenario so that I can get past these DI exceptions would be greatly appreciated! I’m not sure if I need two explicitly configured endpoints and two complete sets of IServiceCollections (which at present are formed across both the Program.cs itself and an extension class.) or something different.

Thanks in advance!
Anye

A quick chat with @boblangley determined that the multiple queue thing was a red herring – the actual issue was order of operations. My redis configuration was underneath my NSB configuration, so the classes that got registered as part of that setup were not available to it. Putting the NSB registration at the very end of my config section resolved the issue.

Thanks, Bob!