Callbacks uniquely identifiable endpoint concerns when deployed in docker

I was able to develop a pattern that might accomplish the same thing without the need for callbacks as they are currently implemented, and I think still keeps with the async messaging philosophy. Basically:

I created a special handler specifically for handling reply messages. So if you create one for your expected reply type, it’ll grab it. It relies on some form of “request id” in order to do this (I’m thinking of just using the correlation id), but it receives the message and then publishes it to a redis channel.

The original caller would essentially send the command and then call await SomeHandler.GetResponseAsync(redis, channelName, timeout). That static method subscribes to the channel and then blocks until it gets a message. If it gets one, it’ll deserialize it, then unsubscribe and return.

public abstract class MessageCallbackSubscription<T> 
    : IHandleMessages<T> where T : class
{
    private readonly IConnectionMultiplexer _cache;

    protected MessageCallbackSubscription(IConnectionMultiplexer cache)
    {
        _cache = cache;
    }
    
    protected abstract string GetChannelName(T message);

    public Task Handle(T message, IMessageHandlerContext context)
    {
        var channelName = GetChannelName(message);

        return _cache
            .GetSubscriber()
            .PublishAsync(channelName, JsonConvert.SerializeObject(message));
    }

    public static async Task<T> GetResponseAsync(
        IConnectionMultiplexer multiplexer, 
        string key, 
        int timeoutMilliseconds)
    {
        var expire = DateTime.Now.AddMilliseconds(timeoutMilliseconds);
        
        var result = string.Empty;
        var subscription = multiplexer.GetSubscriber();
        await subscription.SubscribeAsync(key, (ch, val) => result = val).ConfigureAwait(false);

        while (string.IsNullOrEmpty(result) && DateTime.Now < expire) { await Task.Delay(25); } // is there a less hackish way to do this?

        if (string.IsNullOrEmpty(result))
            return null;
        
        var response = JsonConvert.DeserializeObject<T>(result);
        await subscription.UnsubscribeAsync(key).ConfigureAwait(false);
        return response;
    }
}

The handler for the specific message currently looks like this:

public class ProvisionResponseHandler : MessageCallbackSubscription<ProvisionSmsNumberResponse>
    {
        // Convenience methods to keep the keys in line
        public static string KeyFormat = "smsprov-{0}";
        public static string GetKey(object id) => string.Format(KeyFormat, id);

        protected override string GetChannelName(ProvisionSmsNumberResponse message)
        {
            return string.Format(KeyFormat, message.ClientId);
        }

        public ProvisionResponseHandler(IConnectionMultiplexer cache) : base(cache) { }
    }

So the usage looks like this

var key = ProvisionResponseHandler.GetKey(clientId);
await bus.Send<ProvisionSmsNumber>("DAS.Services.Sms", c =>
{
    c.ClientId = clientId;
    c.PreferredAreaCode = 602;
});

var result = await ProvisionResponseHandler.GetResponseAsync(cache, key, 60000);

I created some static convenience methods to keep the channel name inline, but I’m still playing with it as I’m not quite satisfied.

I’m still testing, but in theory, this should work since the it is the instance that sends the first command that blocks waiting for a response, so there’s no need for a uniquely identifiable queue or any concern for competing consumers. Also, since there should only be a single subscriber per request, once I finish up and unsubscribe, redis behavior makes it so that channel dies forever. Nice and neat. In this example, I’m using the ClientId as a unique key for the request which isn’t great, but it was available on the existing objects so I just used it to proof this. In practice, I’d choose something more reliable like the correlationId or something.

Let me know what you think or if you spot any problems with the approach.

@mauroservienti @danielmarbach

So I just noticed that sendoptions.Get/SetCorrelationId() has been recently deprecated and that there is plan to remove it completely in 8.0. This is very upsetting and annoying. This breaks our entire tasking architecture that we were in the middle of building.

I can still see it in the message headers in the handler when a message is received, but I need that ID at Send/Publish time so I can send the correct events over SignalR back to the front end so that they can get feedback.

I’m assuming NSB had something else in mind for this??? Conversation ID? I can’t seem to get at that either. I tried setting the correlationId through the header collection direction but it’s being overridden. Is ConversationID safe to use for this?

I’ve created this if someone wants to take a look at it and give some feedback. This leverages Redis to provide a one-time use reply topic, effectively supplementing where a topic topology isn’t available (something else I’m looking to contribute to the RMQ transport). This essentially creates a backplane using Redis.

Hi Jeremy, the reasoning was that users should never set or get that value. It does seem that you have a use case for at least reading it. Like you say you can still get the value from the message headers.

but I need that ID at Send/Publish time so I can send the correct events over SignalR back to the front end so that they can get feedback.

So you need to store some ID attached to the outgoing messages in order to correlate events coming over SignalR back to the message that caused them to trigger?

Is that correct?

Cheers,

Andreas

So you need to store some ID attached to the outgoing messages in order to correlate events coming over SignalR back to the message that caused them to trigger?

Essentially yes. The idea is that when the front end says “do the thing” we will respond with 202 and the id, so when events start firing over the bus, we need to a) know which client to forward those events to and b) give the client enough information to regain context for that task

For example, when client ‘a’ sends command “update user” and then sends another command “update user”. Over our microservice platform, we may see an event “user updated” and another event “user update failed”. By providing a correlation id (or conversationid), the client will know which even belongs to what task, so that they could for instance display a toast notification “update failed, click here for more information” and then know what data to display or perhaps take them back to the form that started the task.

It appears that conversation ID will achieve the same effect, but it seems odd that I have to rely on a header that NSB introduced (conversation id) rather than use a ubiquitous header (i.e. correlation id) for consistency sake. For example, rabbitmq even has the ‘correlationId’ field baked into their options object for the client to set. In the meanwhile, I’m just wondering if there are some other side effects of conversation id that I should be aware of.

The idea is that when the front end says “do the thing” we will respond with 202 and the id, so when events start firing over the bus, we need to a) know which client to forward those events to and b) give the client enough information to regain context for that task

Got it, seems like a legit use case to me

It appears that conversation ID will achieve the same effect, but it seems odd that I have to rely on a header that NSB introduced (conversation id) rather than use a ubiquitous header (i.e. correlation id) for consistency sake.

To be fair ConversationId is also one of those ubiquitous headers - Correlation and Conversations - Enterprise Integration Patterns

For example, rabbitmq even has the ‘correlationId’ field baked into their options object for the client to set. In the meanwhile, I’m just wondering if there are some other side effects of conversation id that I should be aware of.

I would recommend going with your own header to avoid infrastructure like NServiceBus and RabbitMQ interfering with your own functionality. Having your own Acme.WebCallbackId or similar would be bulletproof and more explicit?

It would also arguable be better for monitoring etc since logs would contain that header instead of some opaque “correlation id”.

Thoughts?

I think that’s reasonable. Maybe something like “TaskId”. That’ll work think. Does NSB just keep all original headers intact throughout the context?

Any thoughts on the Callback solution that I implemented above, using Redis as a backplane in order to eliminate the need for uniquely identifiable endpoints? It was just by chance that I came across an article about an hour ago, where Microsoft solved a similar problem with SignalR using Redis as a backplane to ensure horizontally distributed services could still route messages to the correct instance with the user socket connection.

Maybe something like “TaskId”. That’ll work think

:+1:

Does NSB just keep all original headers intact throughout the context?

Yes we do not mutate headers in general. (there are a few “counters” like some of the retry headers that might get incremented - Message Headers • NServiceBus • Particular Docs )

Any thoughts on the Callback solution that I implemented above, using Redis as a backplane in order to eliminate the need for uniquely identifiable endpoints?

Took a brief look and it indeed seems like a good idea for Redis users :+1:

1 Like

It’s been a while, but problem is still there. When deployed in docker it’s hard to find any persistent id for MakeInstanceUniquelyAddressable call. More over I don’t want callbacks queues and exchanges to be persistent. When container stops, I no longer need replies addressed to this instance.So in my case auto-delete option is needed for callbacks queues and exchanges.
To support auto-delete on uniquely identifiable endpoints now I must customize whole IRoutingTopology, and inside it I must do some euristic to find wich of IEnumerable<string> receivingAddresses is for callbacks.

Is there a way to support auto-delete option for callbacks qeues and exchanges inside core NSB libs?

1 Like

I have opened a feature request with a possible solution. RabbitMQ is addressed in the second comment.