WebSocket -> NSB Gateway

I’m building a gateway between websockets and NSB. It have two main purposes:

  1. Receive messages over websockets and send them using NSB to the correct endpoint.
  2. Subscribe on events (object handler), find the correct WebSocket clients and forward the messages to them.

I’ve built everything and it works great with one exception:

All assemblies must have been loaded in the gateway for this to work. From an implementation perspective, that is not necessary since the gateway do not process the messages at all. And we have a lot of API/DTO assemblies :wink:

So I’m wondering of there is some way to receive messages in an NSB endpoint without having them deserialized from JSON to .NET objects?

Do I simply do something like the multi serializer example but just leave them as byte arrays? Or do you suggest another solution?

I got it working.

Clients --object--> WebSocket --json--> Gateway --json--> NSB --object--> MicroService

Websockets to NSB

I wrap the incoming byte array in a RawMessage together with Headers.ContentType and Headers.EnclosedMessageTypes. The custom serializer then just copies them to the outbound physical message.

All regular endpoints seems to be working with messages from NSB.

NSB to WebSockets

The deserializer does the opposite. It takes the JSON and puts it in the RawMessage. The I just need a IHandleMessages<RawMessage> to get it, see which websocket clients it should go to and invoke them.

MessageId

My only concern is that I generate messageids which I then add to the Headers.MessageId in the serializer. Need that to be able to track request replies so that I can send the reply to the correct web socket client.

Will that break something in NSB?

Code

class RawSerializeConnector :
    StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
    
    
    public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
    {
        
        if (context.ShouldSkipSerialization())
        {
            var emptyMessageContext = this.CreateOutgoingPhysicalMessageContext(
                messageBody: new byte[0],
                routingStrategies: context.RoutingStrategies,
                sourceContext: context);
            await stage(emptyMessageContext)
                .ConfigureAwait(false);
            return;
        }

        var rawMessage = context.Message.Instance as RawMessage;
        if (rawMessage == null)
        {
            return; //should not happen ;)
        }

        var headers = context.Headers;
        headers[Headers.ContentType] = rawMessage.Headers[Headers.ContentType];
        headers[Headers.EnclosedMessageTypes] = rawMessage.Headers[Headers.EnclosedMessageTypes];
        headers[Headers.MessageId] = rawMessage.MessageId;
        if (rawMessage.ReplyToAddress != null)
            headers[Headers.ReplyToAddress] = rawMessage.ReplyToAddress;

        var physicalMessageContext = this.CreateOutgoingPhysicalMessageContext(
            messageBody: rawMessage.Body,
            routingStrategies: context.RoutingStrategies,
            sourceContext: context);
        await stage(physicalMessageContext)
            .ConfigureAwait(false);
    }
}
class RawDeserializeConnector :
    StageConnector<IIncomingPhysicalMessageContext, IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> stage)
    {
        var rawMessage = new RawMessage(context.MessageId, context.MessageHeaders, context.Message.Body){
            ReplyToAddress = context.ReplyToAddress
        };
        var logicalMessage = new LogicalMessage(new MessageMetadata(typeof(RawMessage)), rawMessage);
        var logicalMessageContext = this.CreateIncomingLogicalMessageContext(logicalMessage, context);
        await stage(logicalMessageContext)
            .ConfigureAwait(false);
    }
}

We do have a SendOption for setting the Message ID, so there is no great concern if you are overwriting it. I would review the documentation: Message Identity • NServiceBus • Particular Docs

My concern is that you are replacing the existing deserialization stage. If a bug fix related to that stage were released, I would be concerned not having that change would adversely affect your system.