Is it possible to subscribe to an event by correlation id?

So it has come time to tackle this async response problem we’ve had since adapting this architecture. That is, when the web site submits a request, we respond with 202 and allow the command to pass through all the services it needs to get the job done. The problem is that the frontend doesn’t know if it actually succeeded or not.

The common solution to this is to submit the command and then subscribe to a response queue and wait for some sort of response from the server. What I’m investigating as a more real-time solution is the use of SignalR, now that it’s available in .net core. In this scenario, we have a couple options, I think:

  • Subscribe to a message hub and receive all messages from it. The frontend would continue to submit requests over the REST API separate of the message hub and just handle all messages from the server and sort out what it cares about. Complicated.
  • Have the frontend submit requests over the message hub which sends messages over NSB and then issues a callback that is run when a specific event is received. Less complicated.

What I’m envisioning is that the _session.Send() method allows a transient handler to respond to events that have the same correlation ID as the original request message. Maybe something like this:

public async Task SendMessage(MyRequest request)
{
    var user = await _userStore.GetById(request.UserId);
    if(user == null)
        throw UserDoesntExistException(request.User.Id);
    
    var command = new MyCommand(){ ... }
    var options = new SendOptions();
    options.RegisterCallback<Event1>(Stage1Update);
    options.RegisterCallback<Event2>(Stage2Update);
    
    await _bus.Send(command, options);
}

public async Task Stage1Update(MyResponse response)
{
    // assuming "caller" is captured as part of this closure...
    Clients.Caller.SendMessage(
        "SomeHandler", 
        new { Type = "Stage1Update", Result = response })
}

public async Task Stage2Update(MyResponse response)
{
    // assuming "caller" is captured as part of this closure...
    Clients.Caller.SendMessage(
        "SomeHandler", 
        new { Type = "Stage2Update", Result = response })
}

So what this would do is send the command and then assign callbacks that are execute for each event type. This seems redundant because I could just create event handlers in the normal fashion, but those would respond to all of those events which is problematic. The reason being, I will have multiple frontend clients sending different messages for different reasons and each of them only wants updates on the messages that were sent that respective client.

I think you get the idea.

So does this functionality already exist in some fashion? If not, do you have any suggestions?

It looks like this is has already been thought of in some capacity (Near Real-Time Transient Clients • NServiceBus Samples • Particular Docs), but the example is only a global message and I need something a bit more localized. The only other thing I could think of is using the example in the link above along with a cache of some sort where I can map a message id to a client and then the handler would just use that to figure out which client to send the messages to.

Hi,

Nothing like that exists, yet. My usual approach to such a scenario is,
regardless of the way comments are routed through, to:

  • allow clients to subscribe to event types/correlation ids via the SignalR
    API
  • whenever a client want can send a “subscribe request” to the SignalR Hub
  • the subscription request might contain a correlation id only, which means
    I’m interested in all messages with this correlation id
  • might contain an event type only, I’m interested in all events of this
    type regardless of the correlation id
  • a correlation id and an event type, I’m interested in a specific event
    type with a well known correlation id
  • the SignalR Hub stores the subscription request into its own storage,
    e.g. a custom key/value store
  • all events that in some way can be forwarded to the UI via SignalR
    implement a custom interface, e.g. lCanBeForwardedToUserInterface
  • the web app that hosts the SignalR Hub is subscribed to
    lCanBeForwardedToUserInterface, thus acts like a catch all for the
    lCanBeForwardedToUserInterface type.
  • whenever a message implementing lCanBeForwardedToUserInterface is
    received the handler checks the custom subscription storage and if a match
    is found the event is forwarded to the UI via SignalR

Depending if web front-ends are scaled out horizontally you might also need
a backplain to coordinate across multiple instances.

.m