Handling a message produced by MassTransit

Our team introduced the first distributed architecture in our organization. We opted to take the path of least resistance (free options) and built it on MassTransit and RabbitMq. Now that the organization has bought into a distributed architecture, I would like to evaluate other technologies (including paid options) to see what they have to offer. I am currently working on a POC to integrate NServiceBus into our existing architecture. Our current architecture is built on a MassTransit saga that sends commands to various consumers based on incoming events. I would like to start by shutting down one of those consumers and plugging in a NServiceBus Handler to handle the message instead.

Here’s my endpoint config:

var endPointName = "mapplandetailsfromplandocument";

    var endpointConfiguration = new EndpointConfiguration(endPointName);
    endpointConfiguration
        .EnableInstallers(); // Installers ensure that endpoint-specific artifacts (e.g. database tables, queues, directories, etc.) are created and configured

    endpointConfiguration
        .UseTransport<RabbitMQTransport>()
        .ConnectionString("amqp://guest:guest@localhost:5675")
        .UseConventionalRoutingTopology();

    endpointConfiguration.UseSerialization<NewtonsoftSerializer>();

    // nServiceBus needs to know the type of message (command, event, etc.). Can either
    // implement an nServiceBus interface (eg. ICommand) or define a convention.
    // https://docs.particular.net/nservicebus/messaging/conventions

    var conventions = endpointConfiguration.Conventions();
    conventions.DefiningCommandsAs(type => type.Namespace == "RADAR.Messaging.Commands");
    conventions.DefiningEventsAs(type => type.Namespace == "RADAR.Messaging.Events");

    return endpointConfiguration;

Here's my handler:
    public class MapPlanDetailsFromPlanDocumentConsumer : IHandleMessages<MapPlanDetailsFromPlanDocument>
    {
        public Task Handle(MapPlanDetailsFromPlanDocument message, IMessageHandlerContext context)
        {
            Console.WriteLine("Handling message...");
            return Task.CompletedTask;
        }
    }

Here’s my message (defined in a separate assembly):

public class MapPlanDetailsFromPlanDocument : IMapPlanDetailsFromPlanDocument
    {
        /// <inheritdoc cref="IPlanMessage.PlanId"/>
        public string PlanId { get; set; }
    }

Here’s the message serialized by MassTransit:

  {

      "messageId": "7a000000-9a3c-0005-38b5-08d931cb3555",

      "correlationId": "208d7f02-7e4b-4d76-acb7-82d36bc86791",

      "conversationId": "208d7f02-7e4b-4d76-acb7-82d36bc86791",

      "initiatorId": "208d7f02-7e4b-4d76-acb7-82d36bc86791",

      "sourceAddress": "rabbitmq://localhost/plancomparesaga",

      "destinationAddress": "amqp://localhost:5675/mapplandetailsfromplandocument",

      "messageType": [

        "urn:message:RADAR.Messaging.Commands:MapPlanDetailsFromPlanDocument",

        "urn:message:RADAR.Messaging.Commands:IMapPlanDetailsFromPlanDocument",

        "urn:message:RADAR.Messaging:IPlanMessage"

      ],

      "message": {

        "planId": "000159"

      },

      "sentTime": "2021-06-17T20:05:09.2535477Z",

      "headers": {},

      "host": {

        "machineName": "LT001351",

        "processName": "RADAR.PlanCompare.Saga",

        "processId": 36364,

        "assembly": "RADAR.PlanCompare.Saga",

        "assemblyVersion": "1.0.0.0",

        "frameworkVersion": "3.1.15",

        "massTransitVersion": "7.1.8.0",

        "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"

      }

    }

This is the error message when the handler tries to handle the message:

NServiceBus.MessageDeserializationException: An error occurred while attempting to extract logical messages from incoming physical message 7a000000-9a3c-0005-a6fe-08d931d763b5
           ---> System.Exception: Could not find metadata for 'Newtonsoft.Json.Linq.JObject'.
          Ensure the following:
          1. 'Newtonsoft.Json.Linq.JObject' is included in initial scanning.
          2. 'Newtonsoft.Json.Linq.JObject' implements either 'IMessage', 'IEvent' or 'ICommand' or alternatively, if you don't want to implement an interface, you can use 'Unobtrusive Mode'.

How can I handle this message without the sender modifying the message in any way?

Hi @gannawdm ,

I’m not really familiar with the message format and serialization in MassTransit, but in general, NServiceBus requires primarily a hint at the involved message type which typically is provided along with the message headers.

Since you don’t want to or can’t modify the message or the sender, the challenge seems to be to add this necessary information. This can be done via a custom behavior that runs as part of the message processing pipeline and provides the necessary information. A oversimplified version of this could look like this:

public class DemoBehavior : Behavior<ITransportReceiveContext>
    {
        public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
        {
            context.Message.Headers.Add(Headers.EnclosedMessageTypes, typeof(MapPlanDetailsFromPlanDocument).AssemblyQualifiedName);

            // you can also access the raw RabbitMQ message directly
            var rabbitMqMessage = context.Extensions.Get<BasicDeliverEventArgs>();

            return next();
        }
    }

You then need to register the behavior with your endpoint using:
endpointConfiguration.Pipeline.Register(new DemoBehavior(), "adds message type header information");

With this, the rest remains with Newtonsoft.Json and your message definition. E.g. you can define a InitiatorId property on the message type and Newtonsoft.Json should map it from the payload’s data.

We do have some documentation that might be helpful on this topic:

Let me know if that helps

The message format is below. As you can see, there is a “messageType” property with the types specified as URNs. Rather than adding the expected header to the message, is there a way to determine the type by looking at the “messageType” property?

I think that will be fairly difficult. Although you can access the raw RabbitMQ message via the behavior I mentioned previously, the actual message body (that you’re showing) is a byte array so it needs to be deserialized first to access. You could of course try to deserialize it directly to get the information, but then you’d deserialize the message twice, which would be super slow and inefficient.

At the top of my head, I’d suggest one of the following two options:

  • Add additional information to the message properties, which can be accessed quite easily without any deserialization (but of course requires some sort of modification outside the receiver).
  • MAYBE a custom deserializer could work, but to be honest, I currently do not have any idea if this is a viable option and would allow to circumvent the message type identification part. Writing a custom deserializer might also not be the easiest thing with little NServiceBus experience, so that could be something to reach out to Particular Software as Particular offers help with proof-of-concepts.

I was able to get the Handler to fire by using the custom behavior to extract the type from the messageType property and add the appropriate header.

public class MassTransitMessageAdapter : Behavior<ITransportReceiveContext>
{
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
    var rawMessage = System.Text.Encoding.Default.GetString(context.Message.Body);

    var massTransitMessage = JsonConvert.DeserializeObject<MassTransitMessage>(rawMessage);

    // MassTransit messageType property is an array that includes all interfaces and
    // concrete classes that are compatible with the message.  This extracts the
    // concrete type.
    var concreteType = massTransitMessage.messageType
        .Single(t => !t.Split(':').Last().StartsWith('I'))
        .Replace("urn:message:", string.Empty)
        .Replace(':','.');


    context.Message.Headers.Add(Headers.EnclosedMessageTypes, concreteType);

    return next();
}
}

Now I just need to figure out why it is not deserializing correctly (PlanId is null). The message contains the message body in the “message” property. I wonder if NServiceBus is expecting it in a different property?

Ok. I was able to get it to deserialize. Looks like the MassTransit message has a bunch of “headery” info in the actual payload (ugh) with the actual message contained in the “message” property of the payload (ugh). NServiceBus expects the payload to contain only the message (makes sense!).

So I created a Mutator to set the message Body to the contents of the “message” property.

public class IncomingMassTransitMessageMutator : IMutateIncomingTransportMessages
    {
        public Task MutateIncoming(MutateIncomingTransportMessageContext context)
        {
            var rawMessage = System.Text.Encoding.Default.GetString(context.Body);
            var massTransitMessage = JsonConvert.DeserializeObject<MassTransitMessage>(rawMessage);

            context.Body = Encoding.Default.GetBytes(JsonConvert.SerializeObject(massTransitMessage.message));

            return Task.CompletedTask;
        }
    }

The deserialization is using plain Newtonsoft.Json, which expects the message type to match the same structure as the actual JSON data. so if you want to access the PlanID property, you’d have to also define the containing message type and have this as a property on the message type. If you need to control how the message is being deserialized, this is all done via regular Newtonsoft.Json configuration. We have some documentation on how to configure the Json serializer by code here: Json.NET Serializer • Newtonsoft Serializer • Particular Docs

In case you only need the information from the message part of the original message your approach seems to be working. As mentioned before, the double-deserialization (or respectively triple, since you’re code is also serializing it back again) is probably less efficient due to the redundancy, but if it works (and you’re not having performance issues) it works :wink: