I was able to develop a pattern that might accomplish the same thing without the need for callbacks as they are currently implemented, and I think still keeps with the async messaging philosophy. Basically:
I created a special handler specifically for handling reply messages. So if you create one for your expected reply type, it’ll grab it. It relies on some form of “request id” in order to do this (I’m thinking of just using the correlation id), but it receives the message and then publishes it to a redis channel.
The original caller would essentially send the command and then call await SomeHandler.GetResponseAsync(redis, channelName, timeout)
. That static method subscribes to the channel and then blocks until it gets a message. If it gets one, it’ll deserialize it, then unsubscribe and return.
public abstract class MessageCallbackSubscription<T>
: IHandleMessages<T> where T : class
{
private readonly IConnectionMultiplexer _cache;
protected MessageCallbackSubscription(IConnectionMultiplexer cache)
{
_cache = cache;
}
protected abstract string GetChannelName(T message);
public Task Handle(T message, IMessageHandlerContext context)
{
var channelName = GetChannelName(message);
return _cache
.GetSubscriber()
.PublishAsync(channelName, JsonConvert.SerializeObject(message));
}
public static async Task<T> GetResponseAsync(
IConnectionMultiplexer multiplexer,
string key,
int timeoutMilliseconds)
{
var expire = DateTime.Now.AddMilliseconds(timeoutMilliseconds);
var result = string.Empty;
var subscription = multiplexer.GetSubscriber();
await subscription.SubscribeAsync(key, (ch, val) => result = val).ConfigureAwait(false);
while (string.IsNullOrEmpty(result) && DateTime.Now < expire) { await Task.Delay(25); } // is there a less hackish way to do this?
if (string.IsNullOrEmpty(result))
return null;
var response = JsonConvert.DeserializeObject<T>(result);
await subscription.UnsubscribeAsync(key).ConfigureAwait(false);
return response;
}
}
The handler for the specific message currently looks like this:
public class ProvisionResponseHandler : MessageCallbackSubscription<ProvisionSmsNumberResponse>
{
// Convenience methods to keep the keys in line
public static string KeyFormat = "smsprov-{0}";
public static string GetKey(object id) => string.Format(KeyFormat, id);
protected override string GetChannelName(ProvisionSmsNumberResponse message)
{
return string.Format(KeyFormat, message.ClientId);
}
public ProvisionResponseHandler(IConnectionMultiplexer cache) : base(cache) { }
}
So the usage looks like this
var key = ProvisionResponseHandler.GetKey(clientId);
await bus.Send<ProvisionSmsNumber>("DAS.Services.Sms", c =>
{
c.ClientId = clientId;
c.PreferredAreaCode = 602;
});
var result = await ProvisionResponseHandler.GetResponseAsync(cache, key, 60000);
I created some static convenience methods to keep the channel name inline, but I’m still playing with it as I’m not quite satisfied.
I’m still testing, but in theory, this should work since the it is the instance that sends the first command that blocks waiting for a response, so there’s no need for a uniquely identifiable queue or any concern for competing consumers. Also, since there should only be a single subscriber per request, once I finish up and unsubscribe, redis behavior makes it so that channel dies forever. Nice and neat. In this example, I’m using the ClientId as a unique key for the request which isn’t great, but it was available on the existing objects so I just used it to proof this. In practice, I’d choose something more reliable like the correlationId or something.
Let me know what you think or if you spot any problems with the approach.