Finding a clean way to get feedback to the UI

per @andreasohlund request to move here from Gitter for wider discussion. Forgive the novel as I am just getting this thread caught up with the Gitter discussion.

NSB 7.0 w/RabbitMQ Transport using Conventional Topology

In regards to Forwarding Events to UI Problem Statement.md · GitHub

Per suggestion, I’ve removed the inheritance and have instead plugged into the pipeline to create the behavior of “copying” the message to an endpoint specified in a custom header, if it is present. This approach is a little less obtrusive in that I don’t have to tag my events with a marker interface (e.g. IUpdateUi).

To summarize the problem domain
Right now what happens is that any downstream service that completes a process can publish an event (e.g. UserCreated or CreateUserFailed) to the bus and any service that is interested in that particular event can subscribe to it.

If the task being sent into the system starts with a client (e.g. an application UI), then that client may want to receive feedback asynchronously from any services involved in that transaction. For example, the UI asks it to create a user. If something in that chain fails, then our Saga will have to handle what to do, but if it completely fails, for example, then the UI needs to know so that it can let the user know that the user was not created. That way our UX might be able to do something like pop up a toast notification that takes them to a screen with the original information for them to correct and try again, or something along those lines. Whatever our UX team decides is appropriate.

My current implementation (currently in contention) of the suggestion is to hook into the Publish pipeline.

Where we’re at in the conversation:
My current implementation of the suggestion is to hook into the Publish pipeline. When the UI sends a command to our Gateway, we create the command and Send it on the bus, but we also add in some custom headers: UserId, TaskId, and ResponseEndpoint. That response endpoint is actually a SignalR message hub that is set up as an NSB endpoint.

I implemented Behavior<IOutgoingPublishContext> and inserted it into the pipeline:

    public class ForwardToResponseHub : Behavior<IOutgoingPublishContext>
    {
        private readonly string _destinationHeader;
        public const string Description = "Copy events to an endpoint as a command";

        public ForwardToResponseHub(string destinationHeader)
        {
            _destinationHeader = destinationHeader;
        }

        public override async Task Invoke(IOutgoingPublishContext context, Func<Task> next)
        {
            if (context.Headers.TryGetValue(_destinationHeader, out var responseHub))
            {
                // BUG? Context isn't maintaining headers so I'm manually copying them over
                var options = new SendOptions();
                options.SetDestination(responseHub);
                foreach (var header in context.Headers)
                {
                    if (header.Key.StartsWith("Arsenal"))
                        options.SetHeader(header.Key, header.Value);
                }

                // I'm using a special message to wrap this because the original message is an event
                // but the reply goes to a specific address which is a command. If you try to "send"
                // an event, it will throw. By wrapping the message content in a Command, we avoid that
                var messageJson = JsonConvert.SerializeObject(context.Message.Instance);
                await context
                    .Send<ResponseHubReply>(c => { c.Message = messageJson; }, options)
                    .ConfigureAwait(false);
            }

            await next().ConfigureAwait(false);
        }
    }

The thought behind this (which is currently in contention) is that as the services publish those events that something happened (e.g. UserCreated or CreateUserFailed), this forwarder checks to see if there is a ResponseEndpoint in the headers. If so, it converts it to a command and sends it straight to the endpoint indicated in the header which is, in this case, the SignalR hub. That hub then uses the remaining headers (UserId and TaskId) to blindly forward those messages to the user that started the task.

Here’s the current state of my implementation:

The point of contention provided by @andreasohlund is that this should probably be done in the receive pipeline which I’m having a hard time understanding at the moment:

Feels off that you you send the responses as part of publishing. What if you receive a message that should receive UI feedback that it was complete but it would lets say just update a DB, in that scenario you would not send the feedback message.

I thought doing it in the publish pipeline made sense because essentially what I’m doing is splitting the message; the event goes to the bus but it also goes direct to the indicated endpoint. The event goes out to the bus as per usual so that any explicit subscriptions are fulfilled (e.g. Sagas), but I also ensure that the message goes straight into the SignalR hub’s queue. Any other service involved in the transaction would do the same; any events that came out of their operation would make it back to the UI.

@andreasohlund, you’re suggesting that I do it in the receive pipeline which I’m having trouble understanding. The receive pipeline would be a receiver of the event, correct? If nothing else is subscribed directly to that event then, if I’m understanding correctly, that pipeline would never be touched and thus the message would never make it to SignalR.

I thought the original suggestion was to remove the Inheritance approach so that the SignalR hub didn’t have to subscribe to IUpdateUi. I liked that suggestion because I felt it was a little less obtrusive.

I think think that even if the message is just that a database was updated (e.g. event = UserUpdated), I would still send that feedback to the UI. Why? Maybe they have a spinner on the page and that was the event we decided would indicate that the operation was complete (in the case where there is no associated Saga to give a more precise event, for example). That event would tell them to remove the spinner.

I might have missed something but it seems to me that it’s the successfull processing of the message coming from the UI that is the important part here?

It is, but for our project, that can get more complex than only alerting success or only alerting failure. Here are some points I was thinking of:

  • The UI wants to know about success. Just tell the UI if something is done
    • Simple tasks that are not part of a Saga
    • Distributed tasks that are part of a Saga (many events)
      • Maybe the UI wants to show progress (imagine a % indicator or a checklist). For that, they would be interested several events for a single task
      • This could be useful for troubleshooting. If the UI kept a checklist and those items were checked off as those events came rolling in for that taskId, those that failed could turn to a Red X or something and we’d know exactly what step it failed on.
  • The UI wants to know about failure
    • An error comes out for a task the UI is waiting on (I use the term ‘waiting’ loosely) because of something the user can fix. Show a toast notification that an error happened. Click on it to go back to a page that will allow them to correct something and resubmit.
    • An error comes out for a task the UI is waiting on and it’s not something the user can fix. At least notify them so that they know the task was not completed.

/discuss :rofl::laughing:

I might have interpreted the requirements incorrectly, I assumed that:

For messages that originates from the UI you want to provide feedback that the “command” succeeded or failed (and potentially progress).

If that’s true emitting that feedback as part of processing the message feels most natural to me?

Is there a specific reason to avoid using Reply in the message processing as that should naturally provide the correlation information back to the client?

Oh ok, I think I see what you’re saying and I think we’re saying the same thing from two different angles.

So if we’re talking about the receiver of the command, then that is the receiver and thus the Receive pipeline. I think the way I was looking at it was yes, it is the Receive pipeline until it comes time to publish an event (indicating success or fail) at which point it becomes a publishing pipeline, which is where I hooked into. Like this:

PublishReceive

Having clarified that, what did you have in mind?

A couple reasons came to mind,

  • Reply is the synchronous API, correct? I’m not trying to create a synchronous messaging system; I’m actually trying to follow a typical async pattern of “Send command, subscribe to response queue” but using a WebSocket which is also pretty common. The first command starts the process and then participants of that process can give feedback through events. Now that I type it out loud… if SignalR were integrated into the Saga somehow, the the Saga could be responsible for sending messages to a reply endpoint like SignalR. This of course, doesn’t cover cases where there is no Saga.

  • In another thread, I identified that even in the synchronous scenario (which we do have a couple cases for it), Reply is still troublesome because the endpoint has to be uniquely identifiable which gets really messy in a microservice environment such as Kubernetes where containers come up and go down and scale out and in automatically. Significant infrastructure scripting would have to be in place to keep that under control lest you end up with unique queues/exchanges scattered about the RMQ server, and then you end up with your messaging solution being coupled to your container orchestrator which isn’t great. Honestly, I think this is where a topic topology could really help keep things neat, but that isn’t an option at this time. I tried to offer a solution using Redis here: GitHub - jstafford5380/NServiceBus.Callbacks.Redis: Callback pattern implementation using Redis.

  • Assuming I used that solution, don’t replies still only go to the exact service that sent the message being replied to? Wouldn’t that mean that replies would have to unwind back up the graph to make it back to the original requester? The reality of the solution is that because NSB is .Net-only, commands from NodeJS (or any other stack) need to come in at a .Net api which creates the command and places it on the bus. So if the Gateway creates the command, then it would have to be the one receiving the reply, however the Gateway and the SignalR hub aren’t always the same service, so whether the downstream services get to send messages to SignalR directly or if a parent processes managing a Saga is the one to manage it, we still have to get messages forwarded to SignalR (or some other endpoint). The pattern I have above does that while also allowing the response endpoint to be specified at the top of the call.

Reply is the synchronous API, correct?

No, the reply just results in a message going back to the endpoint sending the message being replied to. You could even override the reply to address to have those replied go back to a “signalr feedback”-endpoint. That endpoint could use eg. a redis backplane to make sure that things works as expected if you web fronts are scaled out.

It still feels a bit off to me that it’s the publishing of an event that decides the success and failure of a UI command. What if you have a command that doesn’t result in an event?

Ie it feels more natural to me that successfully processing a message is the thing that decides if it was successful or not?

No, the reply just results in a message going back to the endpoint sending the message being replied to.

But is is the endpoint that originally sent the message, or is it the endpoint that sent the message to that particular service? What I mean is if the UI sends a message to service A which handles the original command, and A spawns a command to service B, and B spawns to C. If C replies, would that simply be replying to B but not A? If it were a Saga, then I see it would be the Saga that actually replied, but without a Saga, wouldn’t I have to modify the reply to address at every touch point?

Also, correct me if I’m wrong, but don’t replies require that I make the endpoints uniquely identifiable? I thought that was a requirement of the callbacks package. We discussed that one in a different thread (Callbacks uniquely identifiable endpoint concerns when deployed in docker - #26 by andreasohlund)

I don’t really want to commit to using that package with that behavior. I could use my Redis package in place of that, but that actually is a synchronous call. I might be able to modify or extend it to simply forward to an endpoint. I’ll need to churn on that one a bit.

I guess I had simply planned on just getting into the habit of always firing success and error events for all commands, but I kinda see what you’re getting at now. Do you think it is unreasonable or bad practice to always fire success/error events as a system requirement?

In the meanwhile, let’s go back to the receive pipeline thought that you had. How did you envision that working?

No, replies are part of the standard API

You only need the callbacks package which forces to you to make instances uniquely addressable if you need to use in-memory callbacks.

But is is the endpoint that originally sent the message, or is it the endpoint that sent the message to that particular service? What I mean is if the UI sends a message to service A which handles the original command, and A spawns a command to service B, and B spawns to C. If C replies, would that simply be replying to B but not A? If it were a Saga, then I see it would be the Saga that actually replied, but without a Saga, wouldn’t I have to modify the reply to address at every touch point?

Yes that is correct, the reply goes back to the endpoint that sent the message being replied to.

In our gitter thread we talked about dealing with this by using a custom “UIFeedbackEndpointAddress” header that you can “send” the UIFeedback message to. This would work across endpoint hops if you add a behavior to forward it to outgoing messages similar to how we propagate our own ConversationID. See https://github.com/Particular/NServiceBus/blob/develop/src/NServiceBus.Core/Causation/AttachCausationHeadersBehavior.cs#L35 for more details.

In short: You would context.Send to this address explicitly instead of doing a reply.

Does this make any sense?

Do you think it is unreasonable or bad practice to always fire success/error events as a system requirement?

Feel a bit like an infrastructure level requirement that leaks into your business code?

In the meanwhile, let’s go back to the receive pipeline thought that you had. How did you envision that working?

I would just add a behavior to the receive pipeline(a plain message handler would work as well) that would check for a UIFeedbackId+UIFeedbackEndpointAddress header and just send a UIFeedback message with the ID to that address. If the incoming message is processed ok the message will be emitted.

Do you need to report failure as well?

If yes subscribing to the MessageFailedevent

and sending a UICommandFailed message from there might do the trick?

A little bit, but less so that having to ensure all endpoints are Reply’ing to the messages. I’m trying to achieve more of “copy message to the specific endpoint” kind of approach. I’m just trying to make sure I’m not breaking some important semantic in the process :slight_smile:

In short: You would context.Send to this address explicitly instead of doing a reply.

I think that this is a bit more obtrusive. Now all endpoints that want to give feedback have to do so explicitly where I actually want it to be more passive. If I do so by creating a package to handle this form, it’s basically where I’m at right now, but I’m doing so in the publish pipeline of the receiving service.

I would just add a behavior to the receive pipeline(a plain message handler would work as well) that would check for a UIFeedbackId+UIFeedbackEndpointAddress header and just send a UIFeedback message with the ID to that address. If the incoming message is processed ok the message will be emitted.

Ah ok I think I see what you’re saying. So this is kind of like acking, but with a message? I think I can work with that concept. Lemme see what I can come up with.

Thank you for continuing this conversation. I’m learning a lot more about NSB’s capabilities and philosophy/semantics by solving real world problems like this :slight_smile:

Oh, so what should I be extending to achieve the suggestion for plugging into the Receive pipeline so that I can handle successfully processed messages? Is it just Behavior<IIncomingLogicalMessageContext> and check if message was handled?

Update: nevermind, just found Pipeline events • NServiceBus • Particular Docs but it doesn’t have anything available on it to send a message from within the delegate

Ok, so the receive pipeline (on completed) doesn’t have a native context to use for send messages out following a successful messaged handled. I had to do some factory shennanigans to make that work that I’m not a fan of. It would be nice if I had access to the NSB container or some way to have it injected. Perhaps theres some other way to plug into this that would allow injection?

Other than that, I think this is behaving the way I hoped it would but I did notice that headers are not being copied between requests. For example if UI sends command to A and A sends a command to B, it appears that all custom headers are lost between A and B. Is that to be expected?

Example:

public class Stage1Handler : IHandleMessages<Stage1Command>
{
    public async Task Handle(Stage1Command message, IMessageHandlerContext context)
    {
        if (!message.S1Succeed)
            throw new Exception("FUNCTIONAL TEST INDICATED TEST SHOULD FAIL");

        await context
            .Send<Stage2Command>("ServiceB", command =>
            {
                command.ShouldSucceed = message.S2Succeed;
                command.Message = "Command from Producer A";
            })
            .ConfigureAwait(false);
    }
}

This would be the handler that deals with the first command that comes from the UI. It sends another command to another service. When service B receives the message, all of the custom headers are missing. Isn’t the context supposed to keep those intact?

You mean to have access to a “.Send” on the context passed to the behaviour? (can you share the code that you had to write?)

You should be able to use a plain message handler for this purpose?

For example if UI sends command to A and A sends a command to B, it appears that all custom headers are lost between A and B. Is that to be expected?

This is by design since we can’t know which headers should be copied. Like I mentioned previously you can solve this by adding a custom behaviour that copies the relevant headers from the incoming message to all outgoing messages, see:

https://github.com/Particular/NServiceBus/blob/develop/src/NServiceBus.Core/Causation/AttachCausationHeadersBehavior.cs#L35

The code I wrote for the receive pipeline based on the doc I found:

endpointConfiguration.Pipeline.OnReceivePipelineCompleted(async completed =>
{
    if (!completed.ProcessedMessage.Headers.TryGetValue(CustomHeaders.TaskId, out var taskId) ||
        !completed.ProcessedMessage.Headers.TryGetValue(CustomHeaders.UserId, out var userId) ||
        !completed.ProcessedMessage.Headers.TryGetValue(CustomHeaders.ResponseQueue, out var respondTo)
    ) return;
    
    var options = new SendOptions();
    options.SetDestination(respondTo);

    var messageSession = sessionFactory(); // temporary solution
    await messageSession
        .Send<ResponseHubReply>(c =>
        {
            c.TaskId = taskId;
            c.UserId = userId;
            c.Message = Encoding.UTF8.GetString(completed.ProcessedMessage.Body);
        }, options)
        .ConfigureAwait(false);
});

What do you mean when you say I could use a plain message handler?

Gotcha. Ok that’s similar to what I was basically doing at this point, but I wanted to make sure I wasn’t missing something.

Almost there!

What do you mean when you say I could use a plain message handler?

What I meant is that since NServiceBus have what we call batched dispatch

this means that messages won’t be emitted should anything throw during the pipeline invocation.

Ie. you can just create a plain IHandleMessages that runs the code you have in OnReceivePipelineCompleted and achieve the same thing with much less code.

Does that make sense?

All that said if you want to emit message as a result of the MessageFailed event you would still need that code anyway :frowning:

I see. So in IHandleMessages<T> what is T then? I tried implementing IHandleMessages<ReceivePipelineCompleted> but that’s not working. It’s not getting hit and I noticed it doesn’t implement IMessage or IEvent. Looks the same for IHandleMessage<FailedMessage>

Use object as the T to have the handler fire for all incoming messages

For completeness if you are using ServiceControl dealing with failures would be as easy as hooking your “SignalR UI Feedback” endpoint up to the MessageFailed event emitted by ServiceControl

So subscribe to object and then look for ReceivePipelineCompleted? That doesn’t seem to be working either. It is receiving all of the other messages. I feel like I’m looking for the wrong event because ReceivePipelineCompleted isn’t even marked with IEvent

I see my commands but no other messages.

public class ReceiveCompletedHandler : IHandleMessages<object>
{
    public async Task Handle(object message, IMessageHandlerContext context)
    {
        if (!(message is ReceivePipelineCompleted x)) return;

        if (!x.ProcessedMessage.Headers.TryGetValue(CustomHeaders.TaskId, out var taskId) ||
            !x.ProcessedMessage.Headers.TryGetValue(CustomHeaders.UserId, out var userId) ||
            !x.ProcessedMessage.Headers.TryGetValue(CustomHeaders.ResponseQueue, out var respondTo)
        ) return;

        var options = new SendOptions();
        options.SetDestination(respondTo);

        options.CopyHeaders(x.ProcessedMessage.Headers, "DAS");
        
        await context
            .Send<DirectedReply>(c =>
            {
                c.TaskId = taskId;
                c.UserId = userId;
                c.Message = Encoding.UTF8.GetString(x.ProcessedMessage.Body);
            }, options)
            .ConfigureAwait(false);
    }
}

Just to make sure we’re still on the same page, we’re talking about catching the ReceivePipelineCompleted event so that I can send a message to the SignalR hub, and I’m trying to do this via a Handler so so that I can get a pipeline context injected for me to use, correct?

No this has nothing to do with the pipeline, it’s just a generic handler that will fire for all messages regardless of type. Just check for the headers and send the feedback message if they exists.

Does that make sense?