NServiceBus Router with MSMQ and SQS

I am trying to do a proof of concept to establish how the Router can work between an MSMQ endpoint and another endpoint that utilizes the AWS SQS transport. Both endpoints are hosted outside of AWS and in this case on my local PC. I am making use of the sites example off of the Particular web site and attempting to modify Router B to send the message coming from the MSMQ endpoint to SQS so that the SQS endpoint can pick up the message.
Under normal circumstances my SQS transport endpoint has no issue writing to the SQS queue, but now I believe the router has to be able to send the message to the SQS queue.

Also note my region, SQS access, and secret keys are in my app.config

I am configuring the interface on the router as such

routerConfig.AddInterface<SqsTransport>("Local", t => { });

When the router attempts to start I get the following error:

System.Collections.Generic.KeyNotFoundException
HResult=0x80131577
Message=The given key (NServiceBus.Unicast.Messages.MessageMetadataRegistry) was not present in the dictionary.
Source=NServiceBus.Core
StackTrace:
at NServiceBus.Settings.SettingsHolder.Get(String key) in //src/NServiceBus.Core/Settings/SettingsHolder.cs:line 80
at NServiceBus.Settings.SettingsHolder.GetT in /
/src/NServiceBus.Core/Settings/SettingsHolder.cs:line 70
at NServiceBus.Transport.SQS.Configure.SqsTransportInfrastructure…ctor(ReadOnlySettings settings) in //src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs:line 24
at NServiceBus.SqsTransport.Initialize(SettingsHolder settings, String connectionString) in /
/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs:line 42
at NServiceBus.Raw.InitializableRawEndpoint.d__1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter1.GetResult() at ThrottlingRawEndpointConfig1.d__3.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at Interface1.d__5.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at RouterImpl.d__1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at RouterImpl.d__2.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at Program.d__0.MoveNext() in C:\Dev\sites_router\RouterB\Program.cs:line 28

This exception was originally thrown at this call stack:
[External Code]
Program.Main() in Program.cs

Hi @RGBZ

Unfortunately this is glitch with the SQS transport. It has not been designed with this usage scenario in mind and expects certain components that are available only when running full NServiceBus endpoint. This problem is going to be removed once NServiceBus 8.0 is out.

In the meantime there is a workaround to manually add the MessageMetadataRegistry as described in this issue.

I am sorry for the inconveniance.

Szymon

Thank you for the response. I still having some issues making use of that work around.
I do have the region, key, and secret key in my app config but I am not sure if I should be using var client = new AmazonSQSClient(new EnvironmentVariablesAWSCredentials()). I have tried without the environment variables thing also without success. I am getting the following error:

System.InvalidOperationException
HResult=0x80131509
Message=The environment variables AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY/AWS_SESSION_TOKEN were not set with AWS credentials.
Source=AWSSDK.Core
StackTrace:
at Amazon.Runtime.EnvironmentVariablesAWSCredentials.FetchCredentials()
at Program.<>c.b__0_4() in C:\Dev\sites_router\RouterB\Program.cs:line 31
at NServiceBus.Transport.SQS.Configure.SqsTransportInfrastructure…ctor(ReadOnlySettings settings) in //src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs:line 33
at NServiceBus.SqsTransport.Initialize(SettingsHolder settings, String connectionString) in /
/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs:line 42
at NServiceBus.Raw.InitializableRawEndpoint.d__1.MoveNext()

@SzymonPobiega
I have to do a little more experimentation. I went with the default constructor on the AmazonSQSClient since I have the creds in my app config. The endpoint was not starting up initially and showing the error I mentioned in the previous post. But now it’s working and the endpoint started up and created the SQS queues.

@SzymonPobiega
Ok, so I successfully had my MSMQ endpoint successfully send a message to my SQS queue, so definitely thank you for pointing me in the right direction.

I also have been doing all of my poc testing with NServiceBus version 7.4.6 for everything.
In my environment the new SQS base endpoint will be 7.4.6, but the existing MSMQ end point is 6.5.4

I do have 2 additional question though.

  1. I read somewhere that the MSMQ endpoint needs to do something different to connect to the router. What do I need to know about the version differences ?

  2. Everything is on my local development machine. If endpoints and routers will be on different machines, how do they route to each other ? Does it still use the instances.xml file for this or is there another mechanism ?

Thank you

  1. Yes, you need to use NServiceBus.Bridge.Connector 1.x to connect to the Router.
  2. It is a bit more complex

In the MSMQ endpoint you call ConnectToBridge API and that one expects an address (queue@machine) so you need to store that by yourself e.g. in app settings to be able to modify it in each environment.

There is also the SQS side i.e. when you want to send messages from SQS to MSMQ. In the Router connector of the SQS endpoint you can only specify endpoint name, not the machine name. The appropriate place to specify the machine name is the Router’s MSMQ interface but unfortunately the instance mapping file does not work. This is because the instance mapping logic relies on the fact that the transport runs in a full endpoint. Instead, you need to manually provide the instances like this:

msmqInterface.EndpointInstances.AddOrReplaceInstances("config", new List<EndpointInstance>
        {
            new EndpointInstance("Samples.Router.MixedTransports.Client", null, new Dictionary<string, string>
            {
                {"queue" , "otherqueue"}
            })
        });

I tested the code above in a slightly modified “Mixed transports” sample to redirect messages sent from the RabbitMQ side to a custom queue instead of the queue that matches the endpoint name. In order to modify only the machine name, you use the machine key in the dictionary.

Once you are done with the PoC and happy with the result, you can easily copy the code from the instance file parser to your router project so that you can use the same familiar instance mapping file routing syntax in the router, too.

@SzymonPobiega
Thank you for that information. I did have one additional question. If I were to use the latest version of NServiceBus for the MSMQ side and used the connector packages instead, does the ConnectToRouter also expect a string representing queue@machine ?

Thank you

Actually, while looking up the answer I found a bug. The Router Connector treats the provided name in one place as the endpoint name and in the other as the address. I’ll fix it to use the address.

While it might be somewhat more complex this way, this is aligned with other NServiceBus APIs e.g. for providing the audit or error queues. All these APIs operate on the address level and not on the endpoint level.

@SzymonPobiega
Thank you so much !!!

@SzymonPobiega
Sorry to keep asking more questions, but I do have one more. In the example above with going from SQS to MSMQ you mention {“queue” , “otherqueue”}. What does queue and otherqueue represent ?

Update:
I may have figured it out. This seems to work

        msmqInterface.EndpointInstances.AddOrReplaceInstances("config", new List<EndpointInstance> {
            new EndpointInstance("Samples.Router.Sites.Client", null, new Dictionary<string, string>{
                { "queue", "samples.router.sites.client" },
                { "machine", "localhost" }
            })
        });

"queue" is a property MSMQ transport understands (same as "machine") and allows to override queue or machine part of the address via configuration (e.g. the instances file). The "otherqueue" is just a queue I created on my local machine to make sure the information is used property.

If you just need to override the machine, you can omit the queue property entirely (the endpoint name will be used as queue name).

I pushed version 3.9.0 which fixes this bug. It also allows connecting to multiple routers from a single endpoint.

@SzymonPobiega
Thank you so much, I really appreciate it