Need help to implement service bus using NServiceBus with MSMQ

Hi All,

I need service bus using NServiceBus where a particular message will be published once and multiple subscribers are received message as need separately. I want to use for production further.
What should be the simplest way (i.e. Steps) to implement the NServiceBus with MSMQ?

In detail:

  1. I have so many applications (ex: App1, App2, App3, App4) hosted remotely.
  2. Application’s message has to be published & subcribed as need.
  3. I want to use MSMQ as Transport by using NServiceBus.

Hi,

From what I understand from your question, you want to use MSMQ as the transport. And you’d like to publish an event and you want multiple subscribers to receive the event and these endpoints can be running on different servers?

We have a simple sample that you will walk you through this specific scenario. Take a look at

By default the sample uses the learning transport and learning persistence. To change it to use MSMQ, you’ll need to do the following:

  1. In Program.cs (for both publisher & subscriber), configure the endpoints to use MSMQ as transport. So in Program.cs, change the usage of LearningTransport to use MSMQ. Also change the persistence from LearningPersistence to InMemoryPersistence. Note that when deploying to prod, InMemoryPersistence is not a safe option. You’ll need to decide which persistence to use. The supported persistences are outlined here:
    Persistence • NServiceBus • Particular Docs
    If you’re using SQL DB, a good choice for production would be SqlPersistence or NHIbernate.

endpointConfiguration.UsePersistence();
endpointConfiguration.UseTransport();

  1. MSMQ transport does not support Publish/Subscribe natively like the LearningTransport and RabbitMQ transports, therefore in addition to the above, you will also need to specify the publish/subscribe routing.

In the subscriber’s endpoint configuration, make sure you specify the routing:
routing.RegisterPublisher(
eventType: typeof(OrderReceived),
publisherEndpoint: “Samples.PubSub.Publisher”);

And that should be it. The routing documentation is here: Message routing • NServiceBus • Particular Docs

If you’re publisher is say running on ServerA, then you’ll specify the publisher endpoint as “Samples.PubSub.Publisher@ServerA” in the subscriber. Your subscribers can be running on different servers.

Hope this helps. Let us know if you have any further questions.

Cheers,
Indu Alagarsamy

Hi Indu Alagarsamy,

Thank you for your reply.

I am trying with the following steps, which you guided me:

  1. I created a queue named as ‘testqueue’ from MSMQ environment as below picture:

  2. I created Publisher & Subscriber project in two separate solution. In 1st phase, i am trying with Publisher project, where i can publish message to queue (i.e. testqueue). I shared code block below:

         string message = "This is an test message";
         string EndpointName = "testqueue";
    
         var endpointConfiguration = new EndpointConfiguration(EndpointName);
         endpointConfiguration.UseSerialization<JsonSerializer>();
    
         var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
         var connection = @"Data Source=RIPANPC\SqlExpress;Initial Catalog=TestMsmqDB;UID=sa;Password=Password123;Integrated Security=True";
         persistence.SqlVariant(SqlVariant.MsSqlServer);
         persistence.ConnectionBuilder(
             connectionBuilder: () =>
             {
                 return new SqlConnection(connection);
             });
    
         var subscriptions = persistence.SubscriptionSettings();
         subscriptions.CacheFor(TimeSpan.FromMinutes(1));            
    
         var transport = endpointConfiguration.UseTransport<MsmqTransport>();
         var routing = transport.Routing();
         routing.RegisterPublisher(
             assembly: typeof(RowMessage).Assembly,
             publisherEndpoint: EndpointName);
    
         endpointConfiguration.SendFailedMessagesTo("error");
         endpointConfiguration.EnableInstallers();
    
         var endpointInstance = await Endpoint.Start(endpointConfiguration)
             .ConfigureAwait(false);
    
         var rowmessage = new RowMessage
         {
             Message = message
         };
         await endpointInstance.Publish(rowmessage)
             .ConfigureAwait(false);
    
         await endpointInstance.Stop()
             .ConfigureAwait(false);
    

Result:

  1. In above code, there is no run-time error at all.
  2. no number increase in the ‘testqueue’.
  3. Automatically, created two tables (i.e. [dbo].[testqueue_SubscriptionData], [dbo].[testqueue_TimeoutData]) but no data.

Please help me to take right direction on this issue.

Ripan Paul

Hi Ripan,

When you created the queue manually, did you check the “Transactional” checkbox. It is better to not create the queues manually. You can create queues in a couple of ways.

  1. You can call EnableInstallers() on the endpoint configuration, which will ensure that all the necessary queues are created properly. (Make sure you run Visual Studio in Administrator mode)
    https://github.com/Particular/docs.particular.net/blob/master/samples/msmq/simple/Core_6/Sample/Program.cs#L19

  2. When you’re about to deploy, it is better to create the needed queues in your environment. You can use powershell scripts to do that, See:
    MSMQ Transport Scripting • MSMQ Transport • Particular Docs
    This way, you don’t need to have the EnableInstaller code trying to verify if the queues are present and create queues every time the endpoint is restarted. Queue creation is just a one time step that is mostly a installation step.

Looking at your code, you’re calling the RegisterPublisher API in your publisher, this is incorrect. This only needs to be done in the subscriber.

routing.RegisterPublisher(
        assembly: typeof(RowMessage).Assembly,
        publisherEndpoint: EndpointName);

Also, if a publisher does not have any subscribers, the events published will have no where to be propagated. Try running the subscriber as well and add the RegisterPublisher code to your subscriber project and let me know if that works.

The way pub/sub works in MSMQ is:

  1. The subscriber sends a subscription message to the publisher. This is why the subscriber needs to know who the publisher is for the event type (specified in your register publisher api)
  2. The publisher gets the message and stores the subscription info in the persistence storage you’ve specified
  3. When the publisher publishes an event, it looks at the subscription storage to see which subscribers are interested in the event and relays it to them.
    This is explained in detail here: Publish-Subscribe • NServiceBus • Particular Docs

Cheers,
Indu Alagarsamy

Hi Indu Alagarsamy,

I tried with your guidelines.

The following steps are:

  1. At this time, no manual queue has been created from MSMQ Environment. I gone with your point no. #1, as you mentioned above.
  2. Created Publisher Project with the following code:

string message = “This is an testmessge”;
string EndpointName = “testqueue.Publisher”;

        var endpointConfiguration = new EndpointConfiguration(EndpointName);
        endpointConfiguration.EnableInstallers();
        endpointConfiguration.SendFailedMessagesTo("error");

        endpointConfiguration.UseTransport<MsmqTransport>();
        
        endpointConfiguration.UsePersistence<InMemoryPersistence>();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);

        var rowmessage = new RowMessage
        {
            Message = message
        };
        await endpointInstance.Publish(rowmessage)
            .ConfigureAwait(false);

        await endpointInstance.Stop()
            .ConfigureAwait(false);
  1. Created Subscriber Project with the following codes:

string EndpointName_Subscriber = “testqueue.Subscriber”;
string EndpointName_Publisher = “testqueue.Publisher”;

        var endpointConfiguration = new EndpointConfiguration(EndpointName_Subscriber);
        endpointConfiguration.UsePersistence<InMemoryPersistence>();

        var transport = endpointConfiguration.UseTransport<MsmqTransport>();
        var routing = transport.Routing();
        routing.RegisterPublisher(
            assembly: typeof(RowMessage).Assembly,
            publisherEndpoint: EndpointName_Publisher);

        endpointConfiguration.SendFailedMessagesTo("error");
        endpointConfiguration.EnableInstallers();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
        await endpointInstance.Stop()
            .ConfigureAwait(false);

public class MyHandler :
IHandleMessages
{
public Task Handle(RowMessage message, IMessageHandlerContext context)
{

    return Task.CompletedTask;
}

}

Note: Publisher and Subscriber projects are in the separate solution.

Result:

  1. Both project codes are running well. no run-time error.
  2. When run the Publisher project, the given queue as ‘testqueue.Publisher’ is created well.
  3. When run the Subscriber project, the given queue as ‘testqueue.Subscriber’ is created.
  4. Registered queue as ‘testqueue.Publisher’, increasing the ‘Number of Messages’ column in the MSMQ Environment.

*** Note: Is there any way to send you both projects, which i tried to run. So that you can test and suggest me, if I am missing any part.

Regards,
Ripan

Hi Ripan,

Can you upload your code to Github repository? Or a google drive or DropBox and paste a link where I can access your code? I’d be happy to take a look.

Were you able to run the pub/sub sample with the instructions I gave you? You can find the source code to the pub/sub sample here:
https://github.com/Particular/docs.particular.net/tree/master/samples/pubsub/Core_6

Thanks,
Indu Alagarsamy

Hi Indu,

Please check the following links to get source code:

  1. Publisher: GitHub - ripanpaul/NServiceBus-Publisher: Publisher using NServiceBus
  2. Subscriber: GitHub - ripanpaul/NServiceBus-Subscribers: Subscribers using NServiceBus

Your working branch for both projects: “indu-development”. Please sync your working branch to origin, after changes by you.

The following notes on the projects are:

Publisher Project -

  1. I make a SOA. I used PublisherService.asmx for service.

Subscriber Project -

  1. I created a console application.
  2. Created a handler to get message.

*** Can I chat with you regarding this issue? This will be more helpful for me.

Regards,
Ripan

Hi Indu,

Also I am very much concern about Persistence. I want to use SqlPersistence for production.
Please check the Persistence implementation code which i already added in my source code (i.e. Both project).

Thanks & Regards,
Ripan

Hi Ripan,

Creating an endpoint is expensive. You should only do that once per application. You currently create an endpoint instance for every call into your web API.

Create the instance in Global.asax.cs please look at any of the following samples to give you a better idea on how to do that:

Also very important, please read the following guidance about Publishing from a web application:

I wouldn’t recommend to directly Publish from within your service API. To publish, the subscribers need to be fetched from SQL storage and if that storage is down or slow then this can impact your API performance. It is better to first do a Send or SendLocal, to just make sure you have received and stored the incoming API request and then have a handler do the actual Publish. It is very unlikely that MSMQ will be down to send the message to or if that would be causing issues.

Your webmethod would then become very simple and fast.

IMessagSession messageSession; // assign via dependency injection

[WebMethod]
public string MyService(string message)
{
    try
    {
        messageSession.Send(new MyService{ Value = message})
            .GetAwaiter().GetResult();
        return "Message published to bus";
    }
    catch (Exception ex)
    {
        return ex.Message;
    }
}

And have a handler that does the actual publish.

class MyServiceHandler : IHandleMessages<MyService>
{
    public Task Handle(MyService message, IMessageHandlerContext context)
    {
        return context.Publish(new RowMessage
        {
            Message = message.Value
        };
    }
}

Does this help?

Regards,
Ramon

Please give me full code to send the message of below code:

IMessageSession messageSession; // assign via dependency injection

[WebMethod]
public string MyService(string message)
{
    try
    {
        messageSession.Send(new MyService{ Value = message})
            .GetAwaiter().GetResult();
        return "Message published to bus";
    }
    catch (Exception ex)
    {
        return ex.Message;
    }
}

Note: As per above code, my observations are:

  1. No need to configure endpoint.
  2. No need to configure persistence.

Questions:

  1. I need to know the assembly/ directive of IMessagSession.

Regards,
Ripan Paul
.

Hi Ripan,

Maybe I wasn’t clear, as I didn’t wrote that there is no need to configure your endpoint.

You still need to configure your endpoint, but you must do that only once at application start. This is shown in the samples shared with you earlier.

For your convenience, I’ve forked your publisher repository and applied the changes as I’ve mentioned.

Although it isn’t using dependency injection you probably now understand what I meant.

Also, you applied a severe amount of subscription caching which I reduced from one hour to just 1 second. You should not cache subscriptions for that long.

Does that help?

Regards,
Ramon

Hi Indu/ Ramon,

Also I need the followings will be implemented in the separate solution:

Project #1:

  1. Check existing Endpoint. if not exist, Endpoint will be created.
  2. Register Publisher Endpoints as need.
  3. Endpoint will be started.

Project #2:

  1. Only Handler will give the message.

How these will be implemented ?

Note: Both projects are in the separate solution.

Regards,
Ripan

Hi Ripan,

Unfortunately, I do not understand your questions. I also don’t know what you mean with Project #1 and #2. Please try to be as explicit as possible.

Could you try to explain what currently does not work for you?

Regards,
Ramon

Hi Indu/Ramon,

Regarding Subscriber, May I run “Start Subscriber Endpoint” code and “Handler” code separately with separate solution ?

Regards,
Ripan

Hey Ripan

Yes, you can run your handlers in different processes as required. Here is a sample showing handlers in different processes sending messages to each other.

Hi,

I have checked above duplex code. Our main concern is to execute the Subscriber handler part from a web method that is exposed. So there will be a external third party application(Subscriber) which will call the web method that we will expose and as a return parameter it will receive the data that is picked by the handler.

So the issue that we are facing right now is to relate the web method and the handler part together. Is there any mechanism through which we can execute the handler part inside web method and return the data back to subscriber application?

Regards,
Ripan

Dennis and I had a call today with @ripanpaul We’ll follow up with a PoC and a discussion in a call about the PoC proposal.