Partitioning messages by root aggregate ID to maintain single consumer semantics

Greetings,

I am evaluating NServiceBus for a new system of a local company, and I am wondering if the following can be achieved when using the RabbitMQ transport with Cosmos DB persistence?

I want to partition all messages by the root aggregate ID so that each consumer (i.e. an endpoint instance) will be responsible for processing messages from a range of root aggregate IDs based on RabbitMQ consistent hashing exchanges. This will ensure ordered messages per root aggregate semantics, which will simplify the processing of such messages while maintaining consistency. Also, no competing consumers are needed, therefore no concurrency concerns. Other supporting measures are also necessary:

A- Batched publishing needs to also preserve message order in the face of partial publishing failures.

B- Recoverability policy should not change the received message order due to partial processing failures by requeuing to the end of the processing queues.

C- Partially failed messages of an aggregate ID should not be moved to the error queue before storing the aggregate ID both in the database and in memory to stop processing any further messages with the same aggregate ID. Instead, these messages need to be moved to a special suspend queue until the previous failed message is recovered and fully processed from the error queue. If the database is not available for some reason, then the failed message should not be moved to the error queue until the database is available again in order to store the suspended aggregate ID.

D- Sagas will be used to ensure that only a single thread is responsible for handling the messages of a particular root aggregate. This will further ensure single consumer semantics.

E- During scaling out, RabbitMQ will perform rebalancing of the partitioned routing, where new consumers (i.e. added endpoint instances) are started, no processing by these consumers should commence until we confirm that existing consumers have consumed all previous messages for rebalanced aggregate IDs. I can use a send/reply from the new consumers to all old consumers right after RabbitMQ completes the rebalancing, since the sent message will be processed after all previous messages in their processing queues.

F- There is an assumption that RabbitMQ consistent hashing provides pseudo-random even distribution of partitioned messages over the consumers. In case of having a small percentage of aggregates with high write concurrency (i.e. generating much more messages), we can analyze the reasons and have them partitioned in their own consistent hashing space or sent to hot queues.

G- Consumers actually need to control the partitioning based on their needs if the basic by aggregate ID is not suitable for their processing patterns. Therefore, consumers need to specify the RabbitMQ route key and binding filters based on the message headers/contents.

Even synchronous request/response messages with write concerns coming from the end-user can be partitioned, but that is another topic.

Based on the above initial design points, is it possible to configure/customize NServiceBus routing/recoverability components to achieve them?

Many thanks in advance and sorry for the long message.

Hi,

NServiceBus is an opinionated framework based on best practices we’ve gathered over decades of developing distributed systems using SOA, DDD, and other architectural styles.

What you want to achieve isn’t possible with NServiceBus because of these reasons.

I’m not sure where your requirements originate from, but I’m pretty confident nothing out there can help satisfy your needs. So there’s nothing besides building this yourself, struggling for months or maybe years. I say this with tongue in cheek, but if you come to the same conclusion, call us, and we’ll be happy to help. If you succeed, we might want to hire you and help us extend NServiceBus :wink:

Hi,

It almost sounds like a distributed lock to achieve some sort of global ordering within an aggregate ID and then trying to reimplement that on top of queues and making it a system wide concern. To me that sounds not like a good idea but I probably don’t yet understand your actual use cases good enough based on that generic description.

Like @Dennis mentioned, trying to further go into your actual business cases might help.

Some sort of dynamic partitioning would be possible by applying a similar trick we did with the Service Fabric Partition-aware routing Service Fabric Partition-Aware Routing • NServiceBus Samples • Particular Docs

But that doesn’t give you the ordering you think you need. If ordering is really such a strict requirement (which in most cases it isn’t) you might want to look into building that part of the domain by leveraging an event stream per aggregate ID and using a technology like Kafka?

Regards,
Daniel