Finding a clean way to get feedback to the UI

Is there a specific reason to avoid using Reply in the message processing as that should naturally provide the correlation information back to the client?

Oh ok, I think I see what you’re saying and I think we’re saying the same thing from two different angles.

So if we’re talking about the receiver of the command, then that is the receiver and thus the Receive pipeline. I think the way I was looking at it was yes, it is the Receive pipeline until it comes time to publish an event (indicating success or fail) at which point it becomes a publishing pipeline, which is where I hooked into. Like this:

PublishReceive

Having clarified that, what did you have in mind?

A couple reasons came to mind,

  • Reply is the synchronous API, correct? I’m not trying to create a synchronous messaging system; I’m actually trying to follow a typical async pattern of “Send command, subscribe to response queue” but using a WebSocket which is also pretty common. The first command starts the process and then participants of that process can give feedback through events. Now that I type it out loud… if SignalR were integrated into the Saga somehow, the the Saga could be responsible for sending messages to a reply endpoint like SignalR. This of course, doesn’t cover cases where there is no Saga.

  • In another thread, I identified that even in the synchronous scenario (which we do have a couple cases for it), Reply is still troublesome because the endpoint has to be uniquely identifiable which gets really messy in a microservice environment such as Kubernetes where containers come up and go down and scale out and in automatically. Significant infrastructure scripting would have to be in place to keep that under control lest you end up with unique queues/exchanges scattered about the RMQ server, and then you end up with your messaging solution being coupled to your container orchestrator which isn’t great. Honestly, I think this is where a topic topology could really help keep things neat, but that isn’t an option at this time. I tried to offer a solution using Redis here: GitHub - jstafford5380/NServiceBus.Callbacks.Redis: Callback pattern implementation using Redis.

  • Assuming I used that solution, don’t replies still only go to the exact service that sent the message being replied to? Wouldn’t that mean that replies would have to unwind back up the graph to make it back to the original requester? The reality of the solution is that because NSB is .Net-only, commands from NodeJS (or any other stack) need to come in at a .Net api which creates the command and places it on the bus. So if the Gateway creates the command, then it would have to be the one receiving the reply, however the Gateway and the SignalR hub aren’t always the same service, so whether the downstream services get to send messages to SignalR directly or if a parent processes managing a Saga is the one to manage it, we still have to get messages forwarded to SignalR (or some other endpoint). The pattern I have above does that while also allowing the response endpoint to be specified at the top of the call.

Reply is the synchronous API, correct?

No, the reply just results in a message going back to the endpoint sending the message being replied to. You could even override the reply to address to have those replied go back to a “signalr feedback”-endpoint. That endpoint could use eg. a redis backplane to make sure that things works as expected if you web fronts are scaled out.

It still feels a bit off to me that it’s the publishing of an event that decides the success and failure of a UI command. What if you have a command that doesn’t result in an event?

Ie it feels more natural to me that successfully processing a message is the thing that decides if it was successful or not?

No, the reply just results in a message going back to the endpoint sending the message being replied to.

But is is the endpoint that originally sent the message, or is it the endpoint that sent the message to that particular service? What I mean is if the UI sends a message to service A which handles the original command, and A spawns a command to service B, and B spawns to C. If C replies, would that simply be replying to B but not A? If it were a Saga, then I see it would be the Saga that actually replied, but without a Saga, wouldn’t I have to modify the reply to address at every touch point?

Also, correct me if I’m wrong, but don’t replies require that I make the endpoints uniquely identifiable? I thought that was a requirement of the callbacks package. We discussed that one in a different thread (Callbacks uniquely identifiable endpoint concerns when deployed in docker - #26 by andreasohlund)

I don’t really want to commit to using that package with that behavior. I could use my Redis package in place of that, but that actually is a synchronous call. I might be able to modify or extend it to simply forward to an endpoint. I’ll need to churn on that one a bit.

I guess I had simply planned on just getting into the habit of always firing success and error events for all commands, but I kinda see what you’re getting at now. Do you think it is unreasonable or bad practice to always fire success/error events as a system requirement?

In the meanwhile, let’s go back to the receive pipeline thought that you had. How did you envision that working?

No, replies are part of the standard API

You only need the callbacks package which forces to you to make instances uniquely addressable if you need to use in-memory callbacks.

But is is the endpoint that originally sent the message, or is it the endpoint that sent the message to that particular service? What I mean is if the UI sends a message to service A which handles the original command, and A spawns a command to service B, and B spawns to C. If C replies, would that simply be replying to B but not A? If it were a Saga, then I see it would be the Saga that actually replied, but without a Saga, wouldn’t I have to modify the reply to address at every touch point?

Yes that is correct, the reply goes back to the endpoint that sent the message being replied to.

In our gitter thread we talked about dealing with this by using a custom “UIFeedbackEndpointAddress” header that you can “send” the UIFeedback message to. This would work across endpoint hops if you add a behavior to forward it to outgoing messages similar to how we propagate our own ConversationID. See https://github.com/Particular/NServiceBus/blob/develop/src/NServiceBus.Core/Causation/AttachCausationHeadersBehavior.cs#L35 for more details.

In short: You would context.Send to this address explicitly instead of doing a reply.

Does this make any sense?

Do you think it is unreasonable or bad practice to always fire success/error events as a system requirement?

Feel a bit like an infrastructure level requirement that leaks into your business code?

In the meanwhile, let’s go back to the receive pipeline thought that you had. How did you envision that working?

I would just add a behavior to the receive pipeline(a plain message handler would work as well) that would check for a UIFeedbackId+UIFeedbackEndpointAddress header and just send a UIFeedback message with the ID to that address. If the incoming message is processed ok the message will be emitted.

Do you need to report failure as well?

If yes subscribing to the MessageFailedevent

and sending a UICommandFailed message from there might do the trick?

A little bit, but less so that having to ensure all endpoints are Reply’ing to the messages. I’m trying to achieve more of “copy message to the specific endpoint” kind of approach. I’m just trying to make sure I’m not breaking some important semantic in the process :slight_smile:

In short: You would context.Send to this address explicitly instead of doing a reply.

I think that this is a bit more obtrusive. Now all endpoints that want to give feedback have to do so explicitly where I actually want it to be more passive. If I do so by creating a package to handle this form, it’s basically where I’m at right now, but I’m doing so in the publish pipeline of the receiving service.

I would just add a behavior to the receive pipeline(a plain message handler would work as well) that would check for a UIFeedbackId+UIFeedbackEndpointAddress header and just send a UIFeedback message with the ID to that address. If the incoming message is processed ok the message will be emitted.

Ah ok I think I see what you’re saying. So this is kind of like acking, but with a message? I think I can work with that concept. Lemme see what I can come up with.

Thank you for continuing this conversation. I’m learning a lot more about NSB’s capabilities and philosophy/semantics by solving real world problems like this :slight_smile:

Oh, so what should I be extending to achieve the suggestion for plugging into the Receive pipeline so that I can handle successfully processed messages? Is it just Behavior<IIncomingLogicalMessageContext> and check if message was handled?

Update: nevermind, just found Pipeline events • NServiceBus • Particular Docs but it doesn’t have anything available on it to send a message from within the delegate

Ok, so the receive pipeline (on completed) doesn’t have a native context to use for send messages out following a successful messaged handled. I had to do some factory shennanigans to make that work that I’m not a fan of. It would be nice if I had access to the NSB container or some way to have it injected. Perhaps theres some other way to plug into this that would allow injection?

Other than that, I think this is behaving the way I hoped it would but I did notice that headers are not being copied between requests. For example if UI sends command to A and A sends a command to B, it appears that all custom headers are lost between A and B. Is that to be expected?

Example:

public class Stage1Handler : IHandleMessages<Stage1Command>
{
    public async Task Handle(Stage1Command message, IMessageHandlerContext context)
    {
        if (!message.S1Succeed)
            throw new Exception("FUNCTIONAL TEST INDICATED TEST SHOULD FAIL");

        await context
            .Send<Stage2Command>("ServiceB", command =>
            {
                command.ShouldSucceed = message.S2Succeed;
                command.Message = "Command from Producer A";
            })
            .ConfigureAwait(false);
    }
}

This would be the handler that deals with the first command that comes from the UI. It sends another command to another service. When service B receives the message, all of the custom headers are missing. Isn’t the context supposed to keep those intact?

You mean to have access to a “.Send” on the context passed to the behaviour? (can you share the code that you had to write?)

You should be able to use a plain message handler for this purpose?

For example if UI sends command to A and A sends a command to B, it appears that all custom headers are lost between A and B. Is that to be expected?

This is by design since we can’t know which headers should be copied. Like I mentioned previously you can solve this by adding a custom behaviour that copies the relevant headers from the incoming message to all outgoing messages, see:

https://github.com/Particular/NServiceBus/blob/develop/src/NServiceBus.Core/Causation/AttachCausationHeadersBehavior.cs#L35

The code I wrote for the receive pipeline based on the doc I found:

endpointConfiguration.Pipeline.OnReceivePipelineCompleted(async completed =>
{
    if (!completed.ProcessedMessage.Headers.TryGetValue(CustomHeaders.TaskId, out var taskId) ||
        !completed.ProcessedMessage.Headers.TryGetValue(CustomHeaders.UserId, out var userId) ||
        !completed.ProcessedMessage.Headers.TryGetValue(CustomHeaders.ResponseQueue, out var respondTo)
    ) return;
    
    var options = new SendOptions();
    options.SetDestination(respondTo);

    var messageSession = sessionFactory(); // temporary solution
    await messageSession
        .Send<ResponseHubReply>(c =>
        {
            c.TaskId = taskId;
            c.UserId = userId;
            c.Message = Encoding.UTF8.GetString(completed.ProcessedMessage.Body);
        }, options)
        .ConfigureAwait(false);
});

What do you mean when you say I could use a plain message handler?

Gotcha. Ok that’s similar to what I was basically doing at this point, but I wanted to make sure I wasn’t missing something.

Almost there!

What do you mean when you say I could use a plain message handler?

What I meant is that since NServiceBus have what we call batched dispatch

this means that messages won’t be emitted should anything throw during the pipeline invocation.

Ie. you can just create a plain IHandleMessages that runs the code you have in OnReceivePipelineCompleted and achieve the same thing with much less code.

Does that make sense?

All that said if you want to emit message as a result of the MessageFailed event you would still need that code anyway :frowning:

I see. So in IHandleMessages<T> what is T then? I tried implementing IHandleMessages<ReceivePipelineCompleted> but that’s not working. It’s not getting hit and I noticed it doesn’t implement IMessage or IEvent. Looks the same for IHandleMessage<FailedMessage>

Use object as the T to have the handler fire for all incoming messages

For completeness if you are using ServiceControl dealing with failures would be as easy as hooking your “SignalR UI Feedback” endpoint up to the MessageFailed event emitted by ServiceControl

So subscribe to object and then look for ReceivePipelineCompleted? That doesn’t seem to be working either. It is receiving all of the other messages. I feel like I’m looking for the wrong event because ReceivePipelineCompleted isn’t even marked with IEvent

I see my commands but no other messages.

public class ReceiveCompletedHandler : IHandleMessages<object>
{
    public async Task Handle(object message, IMessageHandlerContext context)
    {
        if (!(message is ReceivePipelineCompleted x)) return;

        if (!x.ProcessedMessage.Headers.TryGetValue(CustomHeaders.TaskId, out var taskId) ||
            !x.ProcessedMessage.Headers.TryGetValue(CustomHeaders.UserId, out var userId) ||
            !x.ProcessedMessage.Headers.TryGetValue(CustomHeaders.ResponseQueue, out var respondTo)
        ) return;

        var options = new SendOptions();
        options.SetDestination(respondTo);

        options.CopyHeaders(x.ProcessedMessage.Headers, "DAS");
        
        await context
            .Send<DirectedReply>(c =>
            {
                c.TaskId = taskId;
                c.UserId = userId;
                c.Message = Encoding.UTF8.GetString(x.ProcessedMessage.Body);
            }, options)
            .ConfigureAwait(false);
    }
}

Just to make sure we’re still on the same page, we’re talking about catching the ReceivePipelineCompleted event so that I can send a message to the SignalR hub, and I’m trying to do this via a Handler so so that I can get a pipeline context injected for me to use, correct?

No this has nothing to do with the pipeline, it’s just a generic handler that will fire for all messages regardless of type. Just check for the headers and send the feedback message if they exists.

Does that make sense?

I think you lost me. I thought the objective was to only send the message if the command was successfully handled. Wouldn’t this approach just send a message for every incoming message simply because the message arrived?

The pipeline approach seemed good, it only lacked a context for me to send the message with. Right now, because of the order of how things happen, I have to pass in a factory to get ahold of the session

static async Task Main(string[] args)
{
    Console.Title = "Producer A";

    IMessageSession session = null;

    var config = new EndpointConfiguration("ProducerA");
    config.UsePersistence<LearningPersistence>();
    config.UseSerialization<NewtonsoftSerializer>();
    config.EnableInstallers();

    // session is null so delay the retrieval
    config.AllowReplyToHub(() => session);

    var transport = config.UseTransport<RabbitMQTransport>();
    transport.ConnectionString("host=rabbitmq;username=user;password=bitnami");
    transport.UseConventionalRoutingTopology();

    session = await Endpoint.Start(config);
    await Task.Delay(Timeout.Infinite);
}

This works, but feels wrong. I’m going to go wire this into the actual SignalR project to see if it’s any better when I have access to IServiceCollection

The thing is that you can always send the message, due to Batched message dispatch • NServiceBus • Particular Docs it would only ever be dispatched if the message pipeline is completed successfully.

Does that make any sense?