Best practices to prevent blocking the queue with a stuck consumer (using domain events for eventual consistency)

Hello,

We are just beginning to use domain events in our system. These domain events have handlers which listen to the queue of domain events. Currently we have one queue (topic?) for all events and handlers. The problem I foresee is that if one handler, and there can be a lot, cannot process an event (e.g. due to a logic error, dependent service down), the entire domain event queue gets held up. Ensuring that each event is processed at least once (ideally once, but that’s a hard problem) is vital. However, I understand that if an event is continuously failing to be processed (logic error or dependent service down), I probably should skip the event or put the event into a dead letter queue.

An example: I have 2 different domain event handlers, AddToDailySalesReportHandler and RecordSaleForAuditHandler which handle the domain event MachineMadeSale. Today my subscriber would read the domain event queue, then call each handler in turn. If the first handler (AddToDailySalesReportHandler) succeeded but the second failed, it would consider the event unprocessed and try again and again, blocking the queue. Additionally I either need to have logic in my AddToDailySalesReportHandler handler to de-duplicate being called with the same MachineMadeSale event multiple times, OR the domain event causes something which is naturally idempotent anyway.

But is there a better way than to block the entire domain event queue? One solution, though I don’t know how easy it is to implement, is to have one queue listener per handler. This listener would keep track of where it is in the queue. I don’t know how NServiceBus works, because I thought once a consumer consumes a message it is removed from the queue. You could do this with Kafka - each handlers would basically be a consumer group and store their topic/queue offset(s)/cursor.

Additionally to not bring down the entire system, I could also have different queues/topics for different kinds or categories of domain event (e.g. 1 queue for all sale related events, 1 for all inventory related events).

The big reveal - I am using actually node.js + TypeScript, and my understanding is I cannot use NServiceBus as it’s only for .net. I’m not sure there is any equivalent for NServiceBus in the node/JS world. We ingest millions of telemetry signals from IoT devices everyday and as such we use kafka, with different being different consumers of kafka, maintaining their own position in the topic. I have seen Let's talk about Kafka • Particular Software, and agree with most that is said, but (i) I’m still not convinced moving away from kafka is smart (ii) using different conusmer groups per handler would be a good solution.

So questions:

  1. Does NServiceBus have the concept of multiple consumers (domain event handlers in my case), which won’t block the message queue + will the message remain until all consumers have sucessfully processed it?
    I find it hard to find people talking about this on the internet - is that because most devs don’t care? Am I doing something wrong?
  2. Should I use something other than kafka for my domain event message queue, and if so please recommend for the node.js/JS/TS world.

I’m new to this, and trying not to over engineer but also not have gaps in my system. No processing messages means my data is not consistent. My domain events are primarily used for eventual consistency

Thanks

For those that care about details, I use a ‘domain event outbox’ table in my database (if domain events are responses to entity mutations, I will write to the domain event outbox transactionally as I write to other tables to persist the entity). Then I have a debezium change data capture (CDC) that listens to the domain events outbox changes, and any table changes get dropped into a kafka queue. This kafka queue is my domain event ‘message’ queue.

EventHandlers are hosted in an endpoint. Each endpoint has its own queue. You are correct that if one of the handlers for a certain event in a single endpoint throws an error, the whole handling of the event is considered failed.
However, no one forces you to have all handlers for a certain event hosted in a single endpoint. If you spread both handlers in different endpoints, each focused on its own domain, like one for reporting and one for auditing, both endpoints will receive a copy of the event in their queue, and they can handle (and if needed, retry) it totally independent of each other.

Thanks. To confirm an endpoint is just a software component that contains one or more message handlers, correct?

What is general guidance for deciding how many endpoints to use? There are ‘natural’ endpoints, like if you have multiple micro-services you may have one endpoint per micro-service, but if you have a monolithic application and 10s or 100s of handlers is there any guidance?

In my circumstance I have one large monolith. Within the monolith I expect to have 100s of handlers for domain events to enable eventual consistency between aggregates (entities) and build projections/read models. I’m concerned about the fragility of my system. Many of these handlers need to correctly process the domain events so the system is consistent and crucially the data is correct (many read models are used for financial accounting). If a handler fails to process an event (bad code or unexpected input) it throws an error, and thus the message will be reprocessed. Now if it cannot reprocess that message it will keep throwing errors and the system effectively breaks because it cannot proceed.

One way to deal with it is a dead message queue, but unless all your handlers apply state changes transitionally within the endpoint, you will have a system which partially processes the message
e.g. if you have 10 handlers for an endpoint, the first 5 process the event, handler 6 gets stuck and handlers 7, 8, 9, 10 miss out. Even if you write logic to allow 7, 8, 9, 10 to also process the event, the system would consider the message processed even though 6 would not have been, and deciding how to handle this in the future after ‘fixing’ handler 6, may be difficult.

In ‘real’ systems what are best practices for endpoints/handler organization and dealing with failure.

Thanks

Generally speaking a system should be split up by domains or business logic.
Messages that error usually have a predefined number of retries and when that’s reached they could be moved to the error queue and dealt with there so that they are not blocking the system.

There’s a lot to cover - I suggest having a look at the basic principles of distributed systems. There’s a link there to a video about service boundaries which could be of interest.

I would also highly recommend taking the Advanced Distributed System Design course. Depending on where you live, you could attend in person or look at the online version. There’s a list of topics that are covered - it’s not based on NServiceBus but more on general distributed system design which I think you’ll find very helpful.