Saga with Application layer / DDD

Hello,

I would like to use NSB Saga in DDD project. I’m using MessageSessionAdapter instance injected to BookingService to be able to publish events. That’s ok, but I have troubles with scheduling timeouts since RequestTimeout method is bound to Saga instance and requires context. Why it’s not included in IMessageSession interface and how I can call it from an application layer? It looks like NSB doesn’t support DDD philosophy at all. I consider NSB handlers as Asp.Net Core Controllers, so what I’m trying to do is to move the logic from handlers/saga handlers/api controllers to the application layer.

An example:

// NServiceBus

public class BookingSaga : Saga<BookingSagaData>, IAmStartedByMessages<StartBooking>, IHandleMessages<ConfirmBooking>, IHandleTimeouts<BookingTimeout>
{
    private readonly IBookingService bookingService;

    public BookingSaga(IBookingService bookingService)
    {
        bookingService = bookingService;
    }

    public async Task Handle(StartBooking command, IMessageHandlerContext context)
    {
        await bookingService.StartBooking(command.Id, Data);
    }

    public async Task Handle(ConfirmBooking command, IMessageHandlerContext context)
    {
        await bookingService.ConfirmBooking(this.Data.BookingId);
    }
    
    public async Task Timeout(BookingTimeout state, IMessageHandlerContext context)
    {
        await bookingService.CancelBooking(Data.BookingId, Data.TimeoutIndex);
        MarkAsComplete();
    }
}


// Infrastructure

public class MessageSessionAdapter : IMessageSessionAdapter
{
    private readonly IMessageSession session;

    public MessagePublisher(IMessageSession session)
    {
        session = session;
    }
    
    public async Task PublishBookingStartedEvent(Guid bookingId)
    {
        await session.Publish(new BookingStarted { BookingId = bookingId });
    }

    public async Task PublishBookingCancelledEvent(Guid bookingId)
    {
        await session.Publish(new BookingCancelled { BookingId = bookingId });
    }

    public async Task SetTimeout(int seconds)
    {
        // not possible: RequestTimeout requires the specific Saga instance and context
        Data.TimeoutIndex++;
        await session.RequestTimeout(context, TimeSpan.FromSeconds(seconds), new BookingTimeout());
    }
}

// Application

public class BookingService : IBookingService
{
    private readonly IMessageSession session;
    private readonly IBookingEntityFactory factory;
    private readonly IBookingRepository repository;

    public BookingService(IMessageSession session, IBookingEntityFactory factory, IBookingRepository repository)
    {
        session = session;
        factory = factory;
        repository = repository;
    }

    public async Task StartBooking(Guid bookingId, BookingSagaData data)
    {
        data.BookingId = bookingId;
        var entity = factory.CreateBookingEntity(data);
        entity.BookingLogic();
        await repository.Add(entity);
        if (entity.IsCancellable && entity.NotConfirmed) {
          await session.SetTimeout(60);
        }
        await session.PublishBookingStartedEvent(bookingId);
    }

    public async Task ConfirmBooking(Guid bookingId)
    {
        var entity = repository.Get(bookingId);
        entity.Confirm();
        await repository.SaveChanges();
    }

    public async Task CancelBooking(Guid bookingId, int timeoutIndex)
    {
        var entity = repository.GetBookingEntity(bookingId);
        entity.Cancel();
        await repository.SaveChanges();
        await session.PublishBookingCancelledEvent(bookingId);
    }
}

Hi @pinggi

Although IMessageHandlerContext might look similar to ASP.NET’s context, it actually is meant to sit in a different layer. The message handlers (classes that implement IHandleMessages) are meant to be your application services layer. There is usually not gain in abstracting the message handling context and creating another class for the application service.

Another thing that I noticed is that you are using Sagas but you don’t actually make use of Saga data much. NServiceBus Sagas can be used in two ways, either as process managers or as way of implementing your aggregates. In your booking example, you could have a BookingSaga that is itself a booking aggregate and contains all that information. Your BookingLogic() would be part of the saga data class, not a separate class.

Alternatively, if you don’t like implementing aggregates through sagas, I would recommend you to not use sagas here but just make your BookingService handle all the messages. You won’t have access to the Timeout API but you can instead send a delayed message to yourself.

Is there an advantage in using IMessageHandlerContext over registering IMessageSession impl (Endpoint instance) in a DI container?
With IMessageSession in the container I can have the application layer independent of NServiceBus.

NServiceBus Sagas can be used in two ways, either as process managers or as way of implementing your aggregates.

Thanks for mentioning it. It opens new ways of thinking about it.
In this case it seems I use the saga as a process manager.

If it was an implementation of an aggregate, the saga should really be a part of the application layer.
Also it seems to be a good idea to have the app services as actual handlers like you suggest:

// Application

public class BookingService : IHandleMessages<ConfirmBooking>
{
...

However, this would mean the application layer had to be dependent on a specific technology like NServiceBus. I consider NServiceBus more like a communication library used in a client app (a separate project creating an endpoint).

I can imagine a situation where there are more aggregates per process (a saga with one aggregate doesn’t fit here) or more clients (Asp. Net Core REST client, a console app client) using the application layer. They won’t call a logic hidden in a IHandleMessages.Handler method. At least they can’t pass IMessageHandlerContext there. It seems in this case the saga handler should call the application logic the same as the rest api client would do.

A picture how I imagine it (1) and what I understood you suggest (2 or 3). Consider the rest client, nsb client, application, domain and infrastructure to be separate projects.

1.
rest client (controllers) ->
nsb client (endpoint,sagas,handlers) ->
                                                  application -> domain
                                                              -> infrastructure
                                                                          -> db
                                                                          -> http
                                                                          -> nsb
2.
rest client (controllers) ->
nsb client (endpoint) ->
                                 application (sagas,handlers) -> domain
                                                              -> infrastructure
                                                                          -> db
                                                                          -> http
                                                                          -> nsb
3.
rest client (controllers) ->
                         nsb client (endpoint,sagas,handlers) -> domain
                                                              -> infrastructure
                                                                          -> db
                                                                          -> http
                                                                          -> nsb

you can instead send a delayed message to yourself.

Is there any difference between the delayed message and the timeout?
Can I use the delayed message as it would be a timeout or the timeout is safer or better for something.

However, this would mean the application layer had to be dependent on a specific technology like NServiceBus

Yes, that’s true, but the code for that service requires that you pass the MessageSessionAdapter which can be only implemented using a messaging technology. So application services do assume existence of messaging technology anyway. That’s why I am not sure how much value does does IMessageSessionAdapter add besides not having a reference to NServiceBus namespace.

Anyway, if you want to keep the application services layer independent of NServiceBus usage and go with the adapter approach, I think in this case of booking you can simplify your solution by not using the concept of sagas since you don’t use the saga state. It makes things more complex. Just create a class like NServiceBusBookingFacade and make it implement all the IHandle interfaces and pass the invocation further to the booking service.

You would not have access to the timeouts, but as I mentioned, you can use the plain delayed messages instead. Timeouts are in fact implemented as delayed messages. The only difference is that timeouts are automatically routed to the saga that requested them. Since you seem to not use the saga state here, there is no benefit of timeouts over plain delayed messages in your case.

How has making DB, HTTP and NSB calls anything to do with your domain layer? Or in other words why not change your architecture so that the domain layer executes the business logic and returns simple data structures that inform about a decision and based on that decision you execute the side effect occurring things like HTTP, DB calls and NSB calls outside the domain layer? If you do that then you also benefit from not needing to bring all the async/ await machinery into your domain code.

We have such an example in our upgrade guide from v5 to v6

Or in other words the NServiceBus handler does

async Task Handle(Message msg, HandlerContext context) {
    var decision =   domainLayer.Decide();
    foreach(var thingy in decision.FooBar) {
       context.Publish(new FooBarEvent());
    }
    await context.SendLocal(new DoHttpPost());

    await dbContext.TrackEntityChanges();
    await dbContext.SaveChangesAsync();
}

When Aggregate root is implemented as Saga Data does that imposes a limitation that now we can only send messages to aggregate via Message Broker? In other words, if i want to perform some command to aggregate modeled as Saga Data from Controller.Action, where i need an immediate feedback to return back as an Http Response, is there a way to achieve that ?

Hi

If you are using NHibernate persistence then you can load the aggregate the same way as any other NHibernate-managed object. In that case I would hand-craft the mapping to ensure that the same mapping is used by NServiceBus and by the controller (NServiceBus uses a convention-based mapping if there is no hand-crafted one).

It might be a bit more tricky when using SQL Persistence because then you need to write some plumbing code to load and store aggregates the same way as SQL Persistence does but it is definitely doable and boils down to JSON (de)serialization and management of the optimistic concurrency version field.