I need service bus using NServiceBus where a particular message will be published once and multiple subscribers are received message as need separately. I want to use for production further.
What should be the simplest way (i.e. Steps) to implement the NServiceBus with MSMQ?
In detail:
I have so many applications (ex: App1, App2, App3, App4) hosted remotely.
Application’s message has to be published & subcribed as need.
I want to use MSMQ as Transport by using NServiceBus.
From what I understand from your question, you want to use MSMQ as the transport. And you’d like to publish an event and you want multiple subscribers to receive the event and these endpoints can be running on different servers?
We have a simple sample that you will walk you through this specific scenario. Take a look at
By default the sample uses the learning transport and learning persistence. To change it to use MSMQ, you’ll need to do the following:
In Program.cs (for both publisher & subscriber), configure the endpoints to use MSMQ as transport. So in Program.cs, change the usage of LearningTransport to use MSMQ. Also change the persistence from LearningPersistence to InMemoryPersistence. Note that when deploying to prod, InMemoryPersistence is not a safe option. You’ll need to decide which persistence to use. The supported persistences are outlined here: Persistence • NServiceBus • Particular Docs
If you’re using SQL DB, a good choice for production would be SqlPersistence or NHIbernate.
MSMQ transport does not support Publish/Subscribe natively like the LearningTransport and RabbitMQ transports, therefore in addition to the above, you will also need to specify the publish/subscribe routing.
In the subscriber’s endpoint configuration, make sure you specify the routing:
routing.RegisterPublisher(
eventType: typeof(OrderReceived),
publisherEndpoint: “Samples.PubSub.Publisher”);
If you’re publisher is say running on ServerA, then you’ll specify the publisher endpoint as “Samples.PubSub.Publisher@ServerA” in the subscriber. Your subscribers can be running on different servers.
Hope this helps. Let us know if you have any further questions.
I created Publisher & Subscriber project in two separate solution. In 1st phase, i am trying with Publisher project, where i can publish message to queue (i.e. testqueue). I shared code block below:
string message = "This is an test message";
string EndpointName = "testqueue";
var endpointConfiguration = new EndpointConfiguration(EndpointName);
endpointConfiguration.UseSerialization<JsonSerializer>();
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var connection = @"Data Source=RIPANPC\SqlExpress;Initial Catalog=TestMsmqDB;UID=sa;Password=Password123;Integrated Security=True";
persistence.SqlVariant(SqlVariant.MsSqlServer);
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
var transport = endpointConfiguration.UseTransport<MsmqTransport>();
var routing = transport.Routing();
routing.RegisterPublisher(
assembly: typeof(RowMessage).Assembly,
publisherEndpoint: EndpointName);
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
var rowmessage = new RowMessage
{
Message = message
};
await endpointInstance.Publish(rowmessage)
.ConfigureAwait(false);
await endpointInstance.Stop()
.ConfigureAwait(false);
Result:
In above code, there is no run-time error at all.
no number increase in the ‘testqueue’.
Automatically, created two tables (i.e. [dbo].[testqueue_SubscriptionData], [dbo].[testqueue_TimeoutData]) but no data.
Please help me to take right direction on this issue.
When you created the queue manually, did you check the “Transactional” checkbox. It is better to not create the queues manually. You can create queues in a couple of ways.
When you’re about to deploy, it is better to create the needed queues in your environment. You can use powershell scripts to do that, See: MSMQ Transport Scripting • MSMQ Transport • Particular Docs
This way, you don’t need to have the EnableInstaller code trying to verify if the queues are present and create queues every time the endpoint is restarted. Queue creation is just a one time step that is mostly a installation step.
Looking at your code, you’re calling the RegisterPublisher API in your publisher, this is incorrect. This only needs to be done in the subscriber.
Also, if a publisher does not have any subscribers, the events published will have no where to be propagated. Try running the subscriber as well and add the RegisterPublisher code to your subscriber project and let me know if that works.
The way pub/sub works in MSMQ is:
The subscriber sends a subscription message to the publisher. This is why the subscriber needs to know who the publisher is for the event type (specified in your register publisher api)
The publisher gets the message and stores the subscription info in the persistence storage you’ve specified
When the publisher publishes an event, it looks at the subscription storage to see which subscribers are interested in the event and relays it to them.
This is explained in detail here: Publish-Subscribe • NServiceBus • Particular Docs
var endpointConfiguration = new EndpointConfiguration(EndpointName_Subscriber);
endpointConfiguration.UsePersistence<InMemoryPersistence>();
var transport = endpointConfiguration.UseTransport<MsmqTransport>();
var routing = transport.Routing();
routing.RegisterPublisher(
assembly: typeof(RowMessage).Assembly,
publisherEndpoint: EndpointName_Publisher);
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
Console.WriteLine("Press any key to exit");
Console.ReadKey();
await endpointInstance.Stop()
.ConfigureAwait(false);
public class MyHandler :
IHandleMessages
{
public Task Handle(RowMessage message, IMessageHandlerContext context)
{
return Task.CompletedTask;
}
}
Note: Publisher and Subscriber projects are in the separate solution.
Result:
Both project codes are running well. no run-time error.
When run the Publisher project, the given queue as ‘testqueue.Publisher’ is created well.
When run the Subscriber project, the given queue as ‘testqueue.Subscriber’ is created.
Registered queue as ‘testqueue.Publisher’, increasing the ‘Number of Messages’ column in the MSMQ Environment.
*** Note: Is there any way to send you both projects, which i tried to run. So that you can test and suggest me, if I am missing any part.
Can you upload your code to Github repository? Or a google drive or DropBox and paste a link where I can access your code? I’d be happy to take a look.
Also I am very much concern about Persistence. I want to use SqlPersistence for production.
Please check the Persistence implementation code which i already added in my source code (i.e. Both project).
Creating an endpoint is expensive. You should only do that once per application. You currently create an endpoint instance for every call into your web API.
Create the instance in Global.asax.cs please look at any of the following samples to give you a better idea on how to do that:
I wouldn’t recommend to directly Publish from within your service API. To publish, the subscribers need to be fetched from SQL storage and if that storage is down or slow then this can impact your API performance. It is better to first do a Send or SendLocal, to just make sure you have received and stored the incoming API request and then have a handler do the actual Publish. It is very unlikely that MSMQ will be down to send the message to or if that would be causing issues.
Your webmethod would then become very simple and fast.
IMessagSession messageSession; // assign via dependency injection
[WebMethod]
public string MyService(string message)
{
try
{
messageSession.Send(new MyService{ Value = message})
.GetAwaiter().GetResult();
return "Message published to bus";
}
catch (Exception ex)
{
return ex.Message;
}
}
Maybe I wasn’t clear, as I didn’t wrote that there is no need to configure your endpoint.
You still need to configure your endpoint, but you must do that only once at application start. This is shown in the samples shared with you earlier.
For your convenience, I’ve forked your publisher repository and applied the changes as I’ve mentioned.
Although it isn’t using dependency injection you probably now understand what I meant.
Also, you applied a severe amount of subscription caching which I reduced from one hour to just 1 second. You should not cache subscriptions for that long.
Yes, you can run your handlers in different processes as required. Here is a sample showing handlers in different processes sending messages to each other.
I have checked above duplex code. Our main concern is to execute the Subscriber handler part from a web method that is exposed. So there will be a external third party application(Subscriber) which will call the web method that we will expose and as a return parameter it will receive the data that is picked by the handler.
So the issue that we are facing right now is to relate the web method and the handler part together. Is there any mechanism through which we can execute the handler part inside web method and return the data back to subscriber application?