Callbacks uniquely identifiable endpoint concerns when deployed in docker

I really am trying to avoid the callbacks tbh. It’s difficult to explain and it smells to me.

The use case is that the UI is creating a configuration for a customer. The server can accept configurations that are invalid (i.e. we save it, it just isn’t usable – it’s really just a rule set for a worker-type service). The desire is for the server to respond to the “save configuration” command with a validation result that can then be used by the UI to display a warning if the configuration is unusable.

My initial thought was to have the API respond with a location header and then have the UI immediately query that resource to get the final object which will include a validation result. That way the server can just accept the message, save the configuration, validate it, and then update the resource with that result. A potential issue here could be a race condition between us getting that data updated (we’re eventually consistent) and the UI querying for the result.

It’s not a huge deal at the moment. But I’d also like to understand how to make this work for the future where we simply cannot get around the need for a synchronous operation like this. I’d like to be able to achieve it using the bus if possible so that I don’t have to expose APIs on my command model (they run as internal services as it is, so it would change too much about the architecture).

As for the exposing of the container ID through environment variables, I think that would have the same effect as the machine name, no? The container ID is basically the machine name. The problem is cleaning up those queues as they become unused. For example, a container gets added during autoscaling, so a new queue gets added. Then the container gets removed during autoscaling, but the queue remains.

The suggestion above was to autodelete the queue which sounded ok at first (despite the custom configuration), but then it sounds flawed because if we stop the container without the intent to delete it, then it would delete the queue which could cause problems in that those messages would be lost. But I guess that’s a moot point because such is the nature of synchronous messaging… I’m still thinking through this…

Well you can make creating and deleting the instance specific queues a part of your scale up and scale down infrastructure concerns. Then your problem would be solved right?

How do you mean? The scaling is done by kubernetes which scales out by adding or subtracting containers. The queues are inside of rabbitmq which is not running in a container.

I think for that to work, kube would have to provide hooks that fire when scaling up and down, and some other manager would have to then send in commands to clean up the queues. >:/

Maybe it’s just easier to say that NSB can’t cover this elastic scaling scenario?

It seems that charles’s solution is probably the only viable workaround for this. It may even be worth adding as a provision to the rabbitmq transport package. Not sure how other transports would have to deal with it.

Jeremy,

For my curiosity, what are you using callbacks for in your scenario?

Callbacks were originally designed with legacy systems migration in mind.
Callbacks allow senders to wait for a reply, and to force a reply to be
delivered to the same instance that sent the original message.

In what seems to be a fully stateless environment (containers + kubernetes)
I’m just wondering what are you using callbacks for.

.m

Jeremy,

I was thinking about using hooks

Regards
Daniel

@mauroservienti the scenario is described above.

For this particular scenario, I’ve instructed the developer to drop it completely. The server will now validate the configuration separately of the front end.

Found, thanks.
What about an approach like the following:

  • client sends the configuration
  • API server accepts it, and returns HTTP202
  • sends message to backend
  • backend validates incoming configuration
  • publishes event: ConfigurationAccepted or ConfigurationRejected
  • API server is subscribed to the aforementioned events, when event is
    received:
  • API forward the event to client(s) via WebSocket
  • stores the event result locally so to allows clients to query for status
    in case the web socket connection is lost (e.g. users hit refresh)

Callbacks are not required using such an approach. User experience at UI
level can be designed to adapt to a task based environment such as the
above. All actors in the system are disposable, API now can fail and can be
recycled without losing any bit of information, with callbacks if the API
instance waiting for the callback dies there is no easy way to recover.

Thoughts?

.m

Yeah it’s a good design and that’s similar to what I was saying as a potential solution, only using sockets instead. I think this is probably a great solution for a more matured product and we’ll likely move towards something like this before too long. Plus, I believe SignalR isn’t due out for core until 2.1, so we’ve got some time. Our front end guys are just getting redux set up in there so we’ll likely be able to leverage that for this at some point. In the meanwhile, I’ve removed the callback in lieu of client side validation.

So I have another scenario here where we don’t have control of the front end, so i don’t think WebSockets is an options. The scenario is SalesForce. The business wants to use it as a front end for provisioning SMS numbers. It can send the request to do so just fine, but it needs a response so that it can store the value in the account. Without a way to make callbacks work, with autoscaling, it seems my only option is to push the data into SFDC out of band. This is still not ideal, however, because the human working in SFDC won’t be able to tell the customer what their SMS number is.

This is a larger design problem that we will eventually solve by moving that process into a system that we own, but in the meanwhile… :confused:

I was able to develop a pattern that might accomplish the same thing without the need for callbacks as they are currently implemented, and I think still keeps with the async messaging philosophy. Basically:

I created a special handler specifically for handling reply messages. So if you create one for your expected reply type, it’ll grab it. It relies on some form of “request id” in order to do this (I’m thinking of just using the correlation id), but it receives the message and then publishes it to a redis channel.

The original caller would essentially send the command and then call await SomeHandler.GetResponseAsync(redis, channelName, timeout). That static method subscribes to the channel and then blocks until it gets a message. If it gets one, it’ll deserialize it, then unsubscribe and return.

public abstract class MessageCallbackSubscription<T> 
    : IHandleMessages<T> where T : class
{
    private readonly IConnectionMultiplexer _cache;

    protected MessageCallbackSubscription(IConnectionMultiplexer cache)
    {
        _cache = cache;
    }
    
    protected abstract string GetChannelName(T message);

    public Task Handle(T message, IMessageHandlerContext context)
    {
        var channelName = GetChannelName(message);

        return _cache
            .GetSubscriber()
            .PublishAsync(channelName, JsonConvert.SerializeObject(message));
    }

    public static async Task<T> GetResponseAsync(
        IConnectionMultiplexer multiplexer, 
        string key, 
        int timeoutMilliseconds)
    {
        var expire = DateTime.Now.AddMilliseconds(timeoutMilliseconds);
        
        var result = string.Empty;
        var subscription = multiplexer.GetSubscriber();
        await subscription.SubscribeAsync(key, (ch, val) => result = val).ConfigureAwait(false);

        while (string.IsNullOrEmpty(result) && DateTime.Now < expire) { await Task.Delay(25); } // is there a less hackish way to do this?

        if (string.IsNullOrEmpty(result))
            return null;
        
        var response = JsonConvert.DeserializeObject<T>(result);
        await subscription.UnsubscribeAsync(key).ConfigureAwait(false);
        return response;
    }
}

The handler for the specific message currently looks like this:

public class ProvisionResponseHandler : MessageCallbackSubscription<ProvisionSmsNumberResponse>
    {
        // Convenience methods to keep the keys in line
        public static string KeyFormat = "smsprov-{0}";
        public static string GetKey(object id) => string.Format(KeyFormat, id);

        protected override string GetChannelName(ProvisionSmsNumberResponse message)
        {
            return string.Format(KeyFormat, message.ClientId);
        }

        public ProvisionResponseHandler(IConnectionMultiplexer cache) : base(cache) { }
    }

So the usage looks like this

var key = ProvisionResponseHandler.GetKey(clientId);
await bus.Send<ProvisionSmsNumber>("DAS.Services.Sms", c =>
{
    c.ClientId = clientId;
    c.PreferredAreaCode = 602;
});

var result = await ProvisionResponseHandler.GetResponseAsync(cache, key, 60000);

I created some static convenience methods to keep the channel name inline, but I’m still playing with it as I’m not quite satisfied.

I’m still testing, but in theory, this should work since the it is the instance that sends the first command that blocks waiting for a response, so there’s no need for a uniquely identifiable queue or any concern for competing consumers. Also, since there should only be a single subscriber per request, once I finish up and unsubscribe, redis behavior makes it so that channel dies forever. Nice and neat. In this example, I’m using the ClientId as a unique key for the request which isn’t great, but it was available on the existing objects so I just used it to proof this. In practice, I’d choose something more reliable like the correlationId or something.

Let me know what you think or if you spot any problems with the approach.

@mauroservienti @danielmarbach

So I just noticed that sendoptions.Get/SetCorrelationId() has been recently deprecated and that there is plan to remove it completely in 8.0. This is very upsetting and annoying. This breaks our entire tasking architecture that we were in the middle of building.

I can still see it in the message headers in the handler when a message is received, but I need that ID at Send/Publish time so I can send the correct events over SignalR back to the front end so that they can get feedback.

I’m assuming NSB had something else in mind for this??? Conversation ID? I can’t seem to get at that either. I tried setting the correlationId through the header collection direction but it’s being overridden. Is ConversationID safe to use for this?

I’ve created this if someone wants to take a look at it and give some feedback. This leverages Redis to provide a one-time use reply topic, effectively supplementing where a topic topology isn’t available (something else I’m looking to contribute to the RMQ transport). This essentially creates a backplane using Redis.

Hi Jeremy, the reasoning was that users should never set or get that value. It does seem that you have a use case for at least reading it. Like you say you can still get the value from the message headers.

but I need that ID at Send/Publish time so I can send the correct events over SignalR back to the front end so that they can get feedback.

So you need to store some ID attached to the outgoing messages in order to correlate events coming over SignalR back to the message that caused them to trigger?

Is that correct?

Cheers,

Andreas

So you need to store some ID attached to the outgoing messages in order to correlate events coming over SignalR back to the message that caused them to trigger?

Essentially yes. The idea is that when the front end says “do the thing” we will respond with 202 and the id, so when events start firing over the bus, we need to a) know which client to forward those events to and b) give the client enough information to regain context for that task

For example, when client ‘a’ sends command “update user” and then sends another command “update user”. Over our microservice platform, we may see an event “user updated” and another event “user update failed”. By providing a correlation id (or conversationid), the client will know which even belongs to what task, so that they could for instance display a toast notification “update failed, click here for more information” and then know what data to display or perhaps take them back to the form that started the task.

It appears that conversation ID will achieve the same effect, but it seems odd that I have to rely on a header that NSB introduced (conversation id) rather than use a ubiquitous header (i.e. correlation id) for consistency sake. For example, rabbitmq even has the ‘correlationId’ field baked into their options object for the client to set. In the meanwhile, I’m just wondering if there are some other side effects of conversation id that I should be aware of.

The idea is that when the front end says “do the thing” we will respond with 202 and the id, so when events start firing over the bus, we need to a) know which client to forward those events to and b) give the client enough information to regain context for that task

Got it, seems like a legit use case to me

It appears that conversation ID will achieve the same effect, but it seems odd that I have to rely on a header that NSB introduced (conversation id) rather than use a ubiquitous header (i.e. correlation id) for consistency sake.

To be fair ConversationId is also one of those ubiquitous headers - Correlation and Conversations - Enterprise Integration Patterns

For example, rabbitmq even has the ‘correlationId’ field baked into their options object for the client to set. In the meanwhile, I’m just wondering if there are some other side effects of conversation id that I should be aware of.

I would recommend going with your own header to avoid infrastructure like NServiceBus and RabbitMQ interfering with your own functionality. Having your own Acme.WebCallbackId or similar would be bulletproof and more explicit?

It would also arguable be better for monitoring etc since logs would contain that header instead of some opaque “correlation id”.

Thoughts?

I think that’s reasonable. Maybe something like “TaskId”. That’ll work think. Does NSB just keep all original headers intact throughout the context?

Any thoughts on the Callback solution that I implemented above, using Redis as a backplane in order to eliminate the need for uniquely identifiable endpoints? It was just by chance that I came across an article about an hour ago, where Microsoft solved a similar problem with SignalR using Redis as a backplane to ensure horizontally distributed services could still route messages to the correct instance with the user socket connection.

Maybe something like “TaskId”. That’ll work think

:+1:

Does NSB just keep all original headers intact throughout the context?

Yes we do not mutate headers in general. (there are a few “counters” like some of the retry headers that might get incremented - Message Headers • NServiceBus • Particular Docs )

Any thoughts on the Callback solution that I implemented above, using Redis as a backplane in order to eliminate the need for uniquely identifiable endpoints?

Took a brief look and it indeed seems like a good idea for Redis users :+1:

1 Like

It’s been a while, but problem is still there. When deployed in docker it’s hard to find any persistent id for MakeInstanceUniquelyAddressable call. More over I don’t want callbacks queues and exchanges to be persistent. When container stops, I no longer need replies addressed to this instance.So in my case auto-delete option is needed for callbacks queues and exchanges.
To support auto-delete on uniquely identifiable endpoints now I must customize whole IRoutingTopology, and inside it I must do some euristic to find wich of IEnumerable<string> receivingAddresses is for callbacks.

Is there a way to support auto-delete option for callbacks qeues and exchanges inside core NSB libs?

1 Like

I have opened a feature request with a possible solution. RabbitMQ is addressed in the second comment.