Nservicebus rabbitmq integration with json serialization results in xmlserialization exception

nservicebus

(balint) #1

**************sender:
static void Main()
{
string _wc = string.Empty;

    string connectionName = "Tamaki.Sender";
    Console.Title = connectionName;

    Console.WriteLine("Press k to kill, s to start job, c to cancel");
    var P = new Program();

    string payload = string.Empty;
    while (true)
    {
        ConsoleKeyInfo key = Console.ReadKey();
        Console.WriteLine();

        if (key.Key == ConsoleKey.K)
        {
            return;
        }

        var startJob = new StartJob()
        {
            InstanceId = 1,
            JobRequestId = 2
        };

        var jObjectFromMessage = JObject.FromObject(startJob);
        var typeName = typeof(StartJob).FullName;
        //jObjectFromMessage.AddFirst(new JProperty("$type", typeName));
        var serializedMessage = jObjectFromMessage.ToString();


        string messageId = Guid.NewGuid().ToString();
        string routingKey = "Tamaki.Fibonacci";
        P.Publish(messageId, serializedMessage, connectionName, routingKey, _wc, typeName);
    }
}

public void Publish(string messageId, string payload, string connectionName, string routingKey, string wc, string typeName)
{
    var connectionFactory = new ConnectionFactory();
    using (IConnection connection = connectionFactory.CreateConnection(connectionName))
    {
        using (IModel channel = connection.CreateModel())
        {
            IBasicProperties properties = channel.CreateBasicProperties();
            properties.MessageId = messageId;
            properties.ContentType = "application/json";
            properties.Headers = new Dictionary<string, object> { { "NServiceBus.EnclosedMessageTypes", typeName }, { "NServiceBus.ContentType", "application/json" } };

            channel.BasicPublish(
                exchange: string.Empty,
                routingKey: routingKey,
                mandatory: false,
                basicProperties: properties,
                body: Encoding.UTF8.GetBytes(payload));

            Console.WriteLine($"Message with id {messageId} sent to queue, work command: {wc} ");
        }
    }
}

public enum WorkCommand
{
    Start,
    Cancel
}

******************receiver endpoint config:
static async Task Main(string[] args)
{
var endpointConfiguration = new EndpointConfiguration(“Tamaki.Fibonacci”);

        var externalNewtonsoftJson = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
        externalNewtonsoftJson.ContentTypeKey("NewtonsoftJson");


        var externalNewtonsoftBson = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
        externalNewtonsoftBson.ReaderCreator(stream => new BsonDataReader(stream));
        externalNewtonsoftBson.WriterCreator(stream => new BsonDataWriter(stream));
        externalNewtonsoftBson.ContentTypeKey("NewtonsoftBson");

        endpointConfiguration.RegisterComponents(
            registration: components =>
                          {
                              components.ConfigureComponent<IncomingMessageBodyWriter>(DependencyLifecycle.InstancePerCall);
                          });

        TransportExtensions<RabbitMQTransport> transport = endpointConfiguration.UseTransport<RabbitMQTransport>();

        transport.ConnectionString("host=localhost");

        endpointConfiguration.SendFailedMessagesTo("error");
        endpointConfiguration.EnableInstallers();
        endpointConfiguration.UsePersistence<InMemoryPersistence>();

        IEndpointInstance endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

        var jobMessage = new StartJob()
        {
            InstanceId = 2,
            JobRequestId = 1
        };
        await endpointInstance.SendLocal(jobMessage).ConfigureAwait(false);

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
        await endpointInstance.Stop().ConfigureAwait(false);
    }

********************mutator shows body correctly
public class IncomingMessageBodyWriter : IMutateIncomingTransportMessages
{
static ILog log = LogManager.GetLogger();

public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
    var bodyAsString = Encoding.UTF8
                               .GetString(context.Body);
    var contentType = context.Headers.ContainsKey(Headers.ContentType) ? context.Headers[Headers.ContentType] : string.Empty;
    log.Info($"ContentType '{contentType}'. Serialized Message Body:\r\n{bodyAsString}");
    return Task.CompletedTask;
}

}

Note: first message from receiver works fine but any message from sender is sent through the xml serializer. Result is an xml serializer exception.
2018-05-22 05:42:15.360 ERROR NServiceBus.RecoverabilityExecutor Moving message ‘cc64bfb9-5280-4dd7-8175-906f842365a8’ to the error queue ‘error’ because processing failed due to an exception:
NServiceBus.MessageDeserializationException: An error occurred while attempting to extract logical messages from transport message cc64bfb9-5280-4dd7-8175-906f842365a8 —> System.Xml.XmlException: Data at the root level is invalid. Line 1, position 1.
at System.Xml.XmlTextReaderImpl.Throw(Exception e)
at System.Xml.XmlTextReaderImpl.Throw(String res, String arg)
at System.Xml.XmlTextReaderImpl.ParseRootLevelWhitespace()
at System.Xml.XmlTextReaderImpl.ParseDocumentContent()
at System.Xml.XmlTextReaderImpl.Read()
at System.Xml.XmlLoader.Load(XmlDocument doc, XmlReader reader, Boolean preserveWhitespace)
at System.Xml.XmlDocument.Load(XmlReader reader)
at NServiceBus.XmlDeserialization.ReadStreamIntoDocument(Stream stream, Boolean sanitizeInput) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Serializers\XML\XmlDeserialization.cs:line 80
at NServiceBus.XmlDeserialization.Deserialize(Stream stream, IList1 messageTypesToDeserialize) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Serializers\XML\XmlDeserialization.cs:line 32 at NServiceBus.XmlMessageSerializer.Deserialize(Stream stream, IList1 messageTypesToDeserialize) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Serializers\XML\XmlMessageSerializer.cs:line 61
at NServiceBus.DeserializeLogicalMessagesConnector.Extract(IncomingMessage physicalMessage) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 119
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 46
— End of inner exception stack trace —
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 50
at NServiceBus.DeserializeLogicalMessagesConnector.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 29
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.ReceivePerformanceDiagnosticsBehavior.d__2.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Performance\Statistics\ReceivePerformanceDiagnosticsBehavior.cs:line 40
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.MutateIncomingTransportMessageBehavior.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\MessageMutators\MutateTransportMessage\MutateIncomingTransportMessageBehavior.cs:line 43
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.ProcessingStatisticsBehavior.d__0.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Performance\Statistics\ProcessingStatisticsBehavior.cs:line 27
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.TransportReceiveToPhysicalMessageProcessingConnector.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\TransportReceiveToPhysicalMessageProcessingConnector.cs:line 37
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.MainPipelineExecutor.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\MainPipelineExecutor.cs:line 32
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at NServiceBus.Transport.RabbitMQ.MessagePump.d__29.MoveNext() in C:\BuildAgent\work\a9e6741f41af7061\src\NServiceBus.RabbitMQ\Receiving\MessagePump.cs:line 248


(Andreas Öhlund) #2

You need to call .UseSerialization<NewtonsoftSerializer> to set the main serializer since AddDeserializer just adds extra de-serializers.

https://docs.particular.net/nservicebus/serialization/?#specifying-additional-deserializers

Does this make sense?


(balint) #3

I knew it was something easy like that! Works!
answer:
var endpointConfiguration = new EndpointConfiguration(“Tamaki.Fibonacci”);
endpointConfiguration.UseSerialization();
var externalNewtonsoftJson = endpointConfiguration.AddDeserializer();
externalNewtonsoftJson.ContentTypeKey(“NewtonsoftJson”);