Event Sourcing - Creating an NServiceBus Event Publisher


(Craig) #1

Hi just looking to validate our approach.

We’re building a platform that has different bounded contexts. Some of them use event sourcing. We’re using SqlStreamStore for our event store. We’re looking to implement an Event Publisher (similar to the one described in Greg Young’s Cqrs doc - https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf pg 48). On save to the event store, we’re storing, the MessageId (generate new guid), RelatedTo (current messageId), ConversationId, CorrelationId.

Essentially, at the core, it would act similar to our Projectors, so we could use the same polling mechanism. When publishing the message, we would leverage the IEndpointInstance and publish the deserialized event and override the send options for MessageId, RelatedTo, ConversationId, CorrelationId.

Has anyone done anything similar? Is there any other metadata that needs to be override when publishing the message? Anything else to consider?

(Szymon Pobiega) #2

Hi Craig

The NServiceBus makes some assumptions when using IEndpointInstance which would be in conflict with what you want to achieve. It does not allow you to manipulate the correlation and conversation headers.

In order to retain the conversation/correlation you need to use IDispatchMessages low-level component when publishing from the event store to the bus. This interface allows you to manipulate headers freely. The other good thing about IDispatchMessages is that it takes the byte[] as the message body so you don’t need to deserialize and serialize again while publishing from the event store.

The headers you should set are the ones you mentioned (RelatedTo, ConversationId, CorrelationId) and some more:

  • ReplyToAddress should point to the input queue of the event-sourced endpoint (where the commands go)
  • NServiceBusVersion should contain the version of NServiceBus used by that endpoint
  • TimeSent should contain the time the event is published on the bus
  • EnclosedMessageTypes should contain the .NET type that represents the event
  • ContentType should contain the information about how the event has been serialized
  • OriginatingMachine/HostId/Endpoint should contain the information about the event-sourced endpoint

While these headers are not strictly mandatory in all cases, they make it easier for other components of the platform to operate e.g. when a recipient of the event wants to reply by sending a follow-up command.

In order to get the instance of IDispatchMessages you need to create a Feature and a FeatureStartupTask that queries the event store periodically and publishes messages via the dispatcher. [Here’s] more information about the concept of the feature and a startup task.

Another way of getting the IDispatchMessages is to use NServiceBus.Raw open source project. Keep in mind though, that NSB.Raw, while might be easier to use for just sending raw messages, is not officially supported.

Hope it makes sense,

(Craig) #3

Thanks Szymon! This is great. Really appreciate you taking the time to answer this.

(Jimmy Bogard) #4

One quick thing to note - event source messages are not the same as integration/service messages. Don’t publish these messages outside of your service boundaries. They are strictly for internal use only.