One Service, Multiple Endpoints/Virtual Hosts, DI - Leads to incorrect behavior

NSB 7.1, RabbitMQ, Self-hosted windows services, Autofac DI

The diagram below is a map of this system. There are N number of initiating apps (left side), which process data, and then they all have a surrogate processing service that handles uploading the data somewhere (file shipper). In addition, each initiating app submits an analysis result set to a common, shared data warehouse endpoint, which then uses configuration stored in a database to determine where that analysis data should go. In most cases, the analysis data is “teed” to multiple destination queues, which each handle importing to their own isolated DBs.

As I have it set up, the initiator service can successfully communicate to the associated uploading service queue, but when ever it tries to send a datawarehouse message, I’m getting errors in the log that state there is no route or the exchange does not exist in the ‘Customer’ vhost. The queues and exchanges exist and were all created correctly where they belong, it just seems that NServiceBus is not using the vhost of the endpoint I’m doing the send from, and is instead choosing the other vhost to send to?

Essentially, every message winds up on the endpoint used to create the fileshipper integration (which also happens to be the first one configured during service start-up). I need to have these DW messages go to the DW endpoint, but I can’t get it to work, and I feel like I’m missing something.

I have this implemented and working in a VS solution I used as a proof of concept. But, as I’m trying to retrofit the existing service that already does work using a different connection, I’m having issues. I can’t upload the zip of that solution for being a new user, otherwise I’d share it.

If I understand correctly, you’re trying to have endpoints connecting to different vhosts send messages to each other? That is not something that will just work out of the box. A vhost is a boundary just like using separate brokers would be.

Hi Brandon, thanks for replying!
Not exactly, the two endpoints are separate workflows that don’t interact. Basically one is a “go do something with this message” and the other is “log this message”. Its just odd that when I have the instance object for one of the endpoints, the error message logs about missing routes on the other.

It seems to be related to this line, when I remove it from both endpoint creation sections, then it behaves as desired…

endpointConfig.UseContainer(customizations => { customizations.ExistingLifetimeScope(Bootstrapper.Container); });

I’m going to make a quick app to demonstrate this problem, I feel like it might be a bug.

Below is my working example demonstrating this issue. I tried to be as concise as possible. You should be able to drag this into a new .NET Console App, install these three packages:

  <package id="NServiceBus" version="7.2.1" targetFramework="net462" />
  <package id="NServiceBus.Autofac" version="7.0.0" targetFramework="net462" />
  <package id="NServiceBus.RabbitMQ" version="5.1.2" targetFramework="net462" />

And adjust the connection strings to your suiting. I created two brand new, empty VHosts for this experiment. To get NSB to create the queues and exchanges, run it once with the two “SendOnly” lines commented out, and then put them back in for the actual test. This just saves me from having to also write a consumer for this demo.

Here’s how it works, you run it, it configures the endpoints, and then DI is used to generate an instance of a service, which exposes both endpoint instances (through clever hiding, to work around the chicken-or-the-egg DI problem with NSB) and sends a message specific to each endpoint.

Whenever the lines that say blah.UseContainer<AutofacBuilder> are uncommented, the program works correctly. Messages are put into the queues and there’s no issues. If you uncomment those lines in each endpoint section and then run it, curiously you’ll receive this error message:

image (coming from the send call for dwEp/ep2msg.)

Note how the message says “DataWarehouse” exchange, but in the “WF” queue. This is the bug. Or maybe bug, I don’t know, but something definitely gets whacked out when DI is used for both.

    using Autofac;
    using NServiceBus;
    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace DemoForSupport
    {
    	class Program
    	{
    		public static async Task Main(string[] args)
    		{
    			//Set up DI
    			Bootstrapper.Initialize();
    
    			//Create endpoint 1
    			var endpointConfig = new EndpointConfiguration("Workflow");
    			endpointConfig.SendOnly(); //Run once with this commented out in both endpoints to set them up
    			endpointConfig.AuditProcessedMessagesTo("audit");
    			endpointConfig.EnableInstallers();
    			var transport = endpointConfig.UseTransport<RabbitMQTransport>();
    			transport.UseConventionalRoutingTopology();
    			transport.ConnectionString("host=rabbit1;virtualhost=WF;username=admin;password=password;RequestedHeartbeat=100;");
    			var routing = transport.Routing();
    			routing.RouteToEndpoint(typeof(Endpoint1Message), "Workflow");
    			endpointConfig.License(ConfigurationManager.AppSettings["License"]);
    			endpointConfig.UseContainer<AutofacBuilder>(customizations => { customizations.ExistingLifetimeScope(Bootstrapper.Container); });
    			EndpointConfigurator.EndpointInstance = await Endpoint.Start(endpointConfig).ConfigureAwait(false);
    			Console.WriteLine("Endpoint 1 set up");
    
    			//Create endpoint 2
    			var endpointConfig2 = new EndpointConfiguration("DataWarehouse");
    			endpointConfig2.SendOnly(); //Run once with this commented out in both endpoints to set them up
    			endpointConfig2.EnableInstallers();
    			endpointConfig2.License(ConfigurationManager.AppSettings["License"]);
    			var transport2 = endpointConfig2.UseTransport<RabbitMQTransport>();
    			transport2.UseDirectRoutingTopology();
    			transport2.ConnectionString("host=rabbit1;virtualhost=DW;username=admin;password=password;RequestedHeartbeat=100;");
    			var routing2 = transport2.Routing();
    			routing2.RouteToEndpoint(typeof(Endpoint2Message), "DataWarehouse");
    			endpointConfig2.UseContainer<AutofacBuilder>(customizations => { customizations.ExistingLifetimeScope(Bootstrapper.Container); });
    			var instance2 = await Endpoint.Start(endpointConfig2).ConfigureAwait(false);
    			EndpointConfigurator.DatawarehouseEndpointInstance = new DataWarehouseEndpointInstance() { EndpointInstance = instance2 };
    
    			Console.WriteLine("Endpoint 2 set up");
    
    			do
    			{
    				Console.WriteLine("Doing stuff.. press X to exit.");
    				using (var lts = Bootstrapper.Container.BeginLifetimeScope())
    				{
    					var svc = lts.Resolve<IThingySvc>();
    					await svc.DoStuff();
    				}
    			} while (Console.ReadKey().Key != ConsoleKey.X);
    		}
    	}
    
    	public static class Bootstrapper
    	{
    		public static IContainer Container { get; private set; }
    
    		public static void Initialize()
    		{
    			ContainerBuilder builder = new ContainerBuilder();
    			builder.RegisterType<ThingySvc>().AsImplementedInterfaces().InstancePerLifetimeScope();
    			builder.Register(x => EndpointConfigurator.EndpointInstance).As<IEndpointInstance>().SingleInstance();
    			builder.Register(x => EndpointConfigurator.DatawarehouseEndpointInstance).As<IDataWarehouseEndpointInstance>().SingleInstance();
    			Container = builder.Build();
    			Console.WriteLine("Bootstrapper (DI) set up");
    		}
    	}
    
    	public interface IThingySvc { Task DoStuff(); }
    
    	public class ThingySvc : IThingySvc
    	{
    		private IEndpointInstance ep;
    		private IEndpointInstance dwEp;
    
    		public ThingySvc(IEndpointInstance ep, IDataWarehouseEndpointInstance dwEp)
    		{
    			this.ep = ep;
    			this.dwEp = dwEp.EndpointInstance;
    		}
    
    		public async Task DoStuff()
    		{
    			//Send messages to each EP.
    			Endpoint1Message ep1msg = new Endpoint1Message();
    			ep1msg.Data = Guid.NewGuid();
    			await ep.Send(ep1msg).ConfigureAwait(false);
    
    			Endpoint2Message ep2msg = new Endpoint2Message();
    			ep2msg.Data = Guid.NewGuid();
    			await dwEp.Send(ep2msg).ConfigureAwait(false);
    		}
    	}
    
    	public interface IDataWarehouseEndpointInstance
    	{
    		IEndpointInstance EndpointInstance { get; set; }
    	}
    
    	public class DataWarehouseEndpointInstance : IDataWarehouseEndpointInstance
    	{
    		public IEndpointInstance EndpointInstance { get; set; }
    	}
    
    	public class EndpointConfigurator
    	{
    		public static IEndpointInstance EndpointInstance { get; set; }
    		public static IDataWarehouseEndpointInstance DatawarehouseEndpointInstance { get; set; }
    	}
    
    	public class Endpoint1Message : IMessage
    	{
    		public Guid Data { get; set; }
    	}
    
    	public class Endpoint2Message : IMessage
    	{
    		public Guid Data { get; set; }
    	}
    }

If you have multiple endpoints in a single process, you need to create separate DI containers for each endpoint. It appears in the code you’ve posted that they are sharing a container.

That would definitely explain it!