Increasing performance NServiceBus / RabbitMQ

We currently have a machine running RabbitMQ 3.8.2 which we already use for some standard production work, however as we are now scaling out we need to be able handle more and more messages. Note: All work/testing below is on our test enviroment.

I am currently testing the publishing using NServiceBus (7.2.3) on .Net Core 3.1. This using messages which are roughly 70 to 80 bytes (however, lets keep it at 100), pushing to a Lazy queue.
When running the RabbitMQ Performance test using these parameters, i get a publish speed of about 25K to 30K messages per second.
When running on our end, publishing 100K messages takes on average 2 minutes (+/- 620 msg/sec)
As this process will be able to deploy a factor 20 more then 100K messages we need to up the speed.

Additional: Running the same test against an Azure Service Bus (standard) instance, gives a throughput of about 500 messages/sec

Now i am not expecting the same throughput as the PerfTest, but from 25K to just 600 is a big difference for me.
Is this expected? Can we increase this somehow?

Our NServiceBus setup is as follows.
Disabling the Heartbeat, Audit and ServiceControl does not matter in terms of performance.

.UseNServiceBus(hostBuilderContext =>
{
var nserviceBusConfig = hostBuilderContext.Configuration.GetSection("NServiceBus").Get<NServiceBusConfig>();

var endpointConfiguration = new EndpointConfiguration(Endpoints.Batch);
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
endpointConfiguration.EnableInstallers();
endpointConfiguration.LicensePath("/app/config/NServiceBusLicense.xml");

if (!string.IsNullOrEmpty(nserviceBusConfig.HeartbeatQueue))
{
	endpointConfiguration.SendHeartbeatTo(
		serviceControlQueue: nserviceBusConfig.HeartbeatQueue,
		frequency: TimeSpan.FromSeconds(15),
		timeToLive: TimeSpan.FromSeconds(30));
}

if (!string.IsNullOrEmpty(nserviceBusConfig.AuditQueue))
	endpointConfiguration.AuditProcessedMessagesTo(nserviceBusConfig.AuditQueue);

if (!string.IsNullOrEmpty(nserviceBusConfig.MetricsQueue))
{
	var metrics = endpointConfiguration.EnableMetrics();
	metrics.SendMetricDataToServiceControl(
		serviceControlMetricsAddress: nserviceBusConfig.MetricsQueue,
		interval: TimeSpan.FromSeconds(2));
}

var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(x => x.NumberOfRetries(5));

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseConventionalRoutingTopology();
transport.ConnectionString(HostHelpers.GetConnectionString(hostBuilderContext.Configuration, "Transport"));
SetRouting(transport);

return endpointConfiguration;
})

Publishing the messages to the queue is as follows:

var tasks = new ConcurrentBag<Task>();
Parallel.ForEach(lotCommands, new ParallelOptions { MaxDegreeOfParallelism = 20 }, command =>
{
	tasks.Add(_messageSession.Send(command));
});
await Task.WhenAll(tasks);

After posting the above message, i went further with testing.
I have changed our implementation to use the basic RabbitMQ.Client implementation to lower the overhead caused by NServiceBus.
When adding all the headers which NServiceBus also adds, we still have a publish rate of about 4600 messages per second.

We are using the following implementation for this:

var factory = new ConnectionFactory() { Uri = new Uri("") };
using (var connection = factory.CreateConnection())
{
	using (var channel = connection.CreateModel())
	{
		Parallel.ForEach(lotCommands, new ParallelOptions { MaxDegreeOfParallelism = 20 }, command =>
		{
			var body = Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(command));

			var messageid = Guid.NewGuid().ToString();
			var correlationid = Guid.NewGuid().ToString();

			IBasicProperties props = channel.CreateBasicProperties();
			props.Type = "REDACTED";
			props.MessageId = messageid;
			props.ReplyTo = "REDACTED";
			props.CorrelationId = correlationid;
			props.DeliveryMode = 2;
			props.Headers = new Dictionary<string, object>();
			props.Headers.Add("$.diagnostics.originating.hostid", "REDACTED");
			props.Headers.Add("NServiceBus.ContentType", "application/json");
			props.Headers.Add("NServiceBus.ConversationId", Guid.NewGuid().ToString());
			props.Headers.Add("NServiceBus.CorrelationId", correlationid);
			props.Headers.Add("NServiceBus.EnclosedMessageTypes", "REDACTED");
			props.Headers.Add("NServiceBus.MessageId", messageid);
			props.Headers.Add("NServiceBus.MessageIntent", "Send");
			props.Headers.Add("NServiceBus.OriginatingEndpoint", "REDACTED");
			props.Headers.Add("NServiceBus.OriginatingMachine", "REDACTED");
			props.Headers.Add("NServiceBus.ReplyToAddress", "REDACTED");
			props.Headers.Add("NServiceBus.TimeSent", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss:ffffff Z"));
			props.Headers.Add("NServiceBus.Transport.RabbitMQ.ConfirmationId", "40");
			props.Headers.Add("NServiceBus.Version", "7.2.3");
			props.ContentType = "application/json";

			channel.BasicPublish(exchange: "",
								routingKey: "REDACTED",
								basicProperties: props,
								body: body);
				
			});

	}
}

Hi @viwaABF

600 is a very low number. In our experience you should be getting values around 3000 with NServiceBus, give or take. Regarding the performance test, does it enable publisher confirms? Without them, you are basically sending messages to RabbitMQ without waiting for the broker to acknowledge them.

Not sure how Parallel.For works but in our tests we usually spawn sending tasks via Task.Run and cap the limit of concurrent sends by having a semaphore. Something like this (pseudocode):

while (true)
{
    await semaphore.Wait();
    Task.Run(async () => {
        try
        {
            SendMessage();
        }
        finally
        {
            semaphore.Release();
        }
    });
}

another approach might be to spawn a fixed number of tasks/threads and try sending using them.

I have another question: what is the scenario you have in mind that requires sending of 1000s of messages from outside of a handler?

Szymon

Regarding the PerfTest. This is the standard RabbitMQ PerfTest tool: RabbitMQ PerfTest

Since we don’t care about limitations at this point we use a Parallel.ForEach which basically does the same as above, just faster as it is not limited to 1 thread so it is creating the tasks faster.
So when changing the syntax to the given example, we saw an increase in performance (and afterwards it stopped working after about 2-3 minutes)

So we swapped the Parallel out for a normal foreach, making the syntax as follows.

foreach (var command in lotCommands)
{
    tasks.Add(Task.Run(async () =>
    {
        await _messageSession.Send(command);
    }));
}

Which has given an increase in performance, for 500k messages it took about 478 seconds. Giving us an average of 1044 messages per second.
This is faster, but not as fast as we expect.

Additional
Also yesterday we have noticed that our service, which also runs with the same config as above, only pick up the messages at most 30 per second.

Given the scenario, We need to update about 2.7 million records every night from an on-premise system to an off-premise system.
This process needs to be as fast as possible, however when doing the initial tests we noticed that NServiceBus itself is really slow on our end which we now decided to look into first before we continue as we currently have a reliable way of testing this.
If the solution we are currently working on is correct or not (probably not at this point given the performance) is not important at this time as this is not set in stone.

Hi

I definitely don’t encourage using unbounded concurrency as in the example with normal foeareach. This can easily overwhelm the runtime.

I did some tests using following code:

var tasks = Enumerable.Range(1, numberOfTasks).Select(async taskId =>
{
    var random = new Random(Environment.TickCount + taskId);
    while (ct.IsCancellationRequested == false)
    {
        await SendTestMessage(endpoint, destination, random).ConfigureAwait(false);
    }
}).ToArray();

on my laptop using 16 tasks and reached 2000 sustainable send throughput while running lots of other tasks (e.g. three instances of Visual Studio).

The official RabbitMQ PerfTest by default runs with publisher confirms disabled. This means that the sender continues to push messages as fast as the connection allows without waiting for ACKs that the messages have been durably written to disk.

Given that you probably don’t want to lose any of these messages, enabling publisher confirms seems to be the only option for you. NServiceBus does it by default because we prefer running slower than losing data.

When publisher confirms are enabled, the sender waits until it gets back the ACK before “forgetting” the send and can only allow so many outstanding (not-yet-acked) sends so the sending throughput is coupled to the broker being able to durably write messages and send back ACKs. That means that each send requires a full roundtrip.

I was able to reach 2000 msg/s on my laptop probably because the roundtrip time is almost zero on the same machine.

To have a fair comparison try running following two scenarios:

  • in a distributed environment (sender on different machine than receiver), run the PerfTest with publisher confirms enabled ( --confirm / -c)
  • run the NServiceBus-based load generator and PerfTest on the same machine as RabbitMQ broker

I am sure that the numbers will get more alike as there is not much NServiceBus does on top what the .NET RabbitMQ client does. The overhead of NServiceBus is adding a bunch of headers and serializing the payload.

Szymon

Hi,

I’ve been part of the team with which did performance tests some time ago. You can still access the code to those performance tests here: EndToEnd/src/PerformanceTests at master · Particular/EndToEnd · GitHub
It’s not easy code to read, since we’ve created many permutations for every transport, for just receiving messages, receiving and sending, every single transactionmode, etc, etc.

But we were able to get more than 10.000 messages per second while receiving and sending messages using NServiceBus. That was on a local desktop machine with quite nice hardware and fast SSD. However that doesn’t represent real-world scenarios, because we did not do anything with the messages. No database transactions, everything else turned off, etc.

These tests were however for internal use. We wanted to verify performance changes from version to version of NServiceBus, since we started adding async/await and .NET Core support. So sharing these numbers don’t add much to customers, as they’re running in different environments, etc. Running NServiceBus on a desktop or inside Docker or on a VM has very different results. Especially if these machines are running other software. For example adding NServiceBus storage to a SQL Server database that already runs very poorly due to lots of transactions from another business application, doesn’t do much good to either that application nor NServiceBus storage.

Having said that, Szymon already initially asked what it is you’re achieving. If you’re sending 2.7 million messages every-single-day from an on-premise to and off-premise system, it sounds like data synchronization. There are possibly tools available that are build explicitly for this purpose, like ETL tooling?

NServiceBus is more for continuous communication within a system. This prevents batch jobs at the end of the day. Instead, every single time something happens, you can immediately publish an event. The result is near real-time communication becomes possible within a system, instead of the nightly batch job. This could mean communication between the on-premise and off-premise application. The result would still be 2.7 million messages per day, but now spread over 24 hours. An average of 31.25 messages per second. Still data synchronisation (where another tool could still possibly fit better), but a lot less pressure and data is transferred near real-time.

If you want to have a discussion about this, that’s always possible. if I’m correct, you speak Dutch, as I do. Makes it even more simple :wink: Let me know if you’re interested. Either via a direct message or via support@particular.net or my personal email address dennis.vanderstelt@particular.net