**************sender:
static void Main()
{
string _wc = string.Empty;
string connectionName = "Tamaki.Sender";
Console.Title = connectionName;
Console.WriteLine("Press k to kill, s to start job, c to cancel");
var P = new Program();
string payload = string.Empty;
while (true)
{
ConsoleKeyInfo key = Console.ReadKey();
Console.WriteLine();
if (key.Key == ConsoleKey.K)
{
return;
}
var startJob = new StartJob()
{
InstanceId = 1,
JobRequestId = 2
};
var jObjectFromMessage = JObject.FromObject(startJob);
var typeName = typeof(StartJob).FullName;
//jObjectFromMessage.AddFirst(new JProperty("$type", typeName));
var serializedMessage = jObjectFromMessage.ToString();
string messageId = Guid.NewGuid().ToString();
string routingKey = "Tamaki.Fibonacci";
P.Publish(messageId, serializedMessage, connectionName, routingKey, _wc, typeName);
}
}
public void Publish(string messageId, string payload, string connectionName, string routingKey, string wc, string typeName)
{
var connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection(connectionName))
{
using (IModel channel = connection.CreateModel())
{
IBasicProperties properties = channel.CreateBasicProperties();
properties.MessageId = messageId;
properties.ContentType = "application/json";
properties.Headers = new Dictionary<string, object> { { "NServiceBus.EnclosedMessageTypes", typeName }, { "NServiceBus.ContentType", "application/json" } };
channel.BasicPublish(
exchange: string.Empty,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(payload));
Console.WriteLine($"Message with id {messageId} sent to queue, work command: {wc} ");
}
}
}
public enum WorkCommand
{
Start,
Cancel
}
******************receiver endpoint config:
static async Task Main(string[] args)
{
var endpointConfiguration = new EndpointConfiguration(“Tamaki.Fibonacci”);
var externalNewtonsoftJson = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
externalNewtonsoftJson.ContentTypeKey("NewtonsoftJson");
var externalNewtonsoftBson = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
externalNewtonsoftBson.ReaderCreator(stream => new BsonDataReader(stream));
externalNewtonsoftBson.WriterCreator(stream => new BsonDataWriter(stream));
externalNewtonsoftBson.ContentTypeKey("NewtonsoftBson");
endpointConfiguration.RegisterComponents(
registration: components =>
{
components.ConfigureComponent<IncomingMessageBodyWriter>(DependencyLifecycle.InstancePerCall);
});
TransportExtensions<RabbitMQTransport> transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=localhost");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
IEndpointInstance endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
var jobMessage = new StartJob()
{
InstanceId = 2,
JobRequestId = 1
};
await endpointInstance.SendLocal(jobMessage).ConfigureAwait(false);
Console.WriteLine("Press any key to exit");
Console.ReadKey();
await endpointInstance.Stop().ConfigureAwait(false);
}
********************mutator shows body correctly
public class IncomingMessageBodyWriter : IMutateIncomingTransportMessages
{
static ILog log = LogManager.GetLogger();
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
var bodyAsString = Encoding.UTF8
.GetString(context.Body);
var contentType = context.Headers.ContainsKey(Headers.ContentType) ? context.Headers[Headers.ContentType] : string.Empty;
log.Info($"ContentType '{contentType}'. Serialized Message Body:\r\n{bodyAsString}");
return Task.CompletedTask;
}
}
Note: first message from receiver works fine but any message from sender is sent through the xml serializer. Result is an xml serializer exception.
2018-05-22 05:42:15.360 ERROR NServiceBus.RecoverabilityExecutor Moving message ‘cc64bfb9-5280-4dd7-8175-906f842365a8’ to the error queue ‘error’ because processing failed due to an exception:
NServiceBus.MessageDeserializationException: An error occurred while attempting to extract logical messages from transport message cc64bfb9-5280-4dd7-8175-906f842365a8 —> System.Xml.XmlException: Data at the root level is invalid. Line 1, position 1.
at System.Xml.XmlTextReaderImpl.Throw(Exception e)
at System.Xml.XmlTextReaderImpl.Throw(String res, String arg)
at System.Xml.XmlTextReaderImpl.ParseRootLevelWhitespace()
at System.Xml.XmlTextReaderImpl.ParseDocumentContent()
at System.Xml.XmlTextReaderImpl.Read()
at System.Xml.XmlLoader.Load(XmlDocument doc, XmlReader reader, Boolean preserveWhitespace)
at System.Xml.XmlDocument.Load(XmlReader reader)
at NServiceBus.XmlDeserialization.ReadStreamIntoDocument(Stream stream, Boolean sanitizeInput) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Serializers\XML\XmlDeserialization.cs:line 80
at NServiceBus.XmlDeserialization.Deserialize(Stream stream, IList1 messageTypesToDeserialize) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Serializers\XML\XmlDeserialization.cs:line 32 at NServiceBus.XmlMessageSerializer.Deserialize(Stream stream, IList
1 messageTypesToDeserialize) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Serializers\XML\XmlMessageSerializer.cs:line 61
at NServiceBus.DeserializeLogicalMessagesConnector.Extract(IncomingMessage physicalMessage) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 119
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 46
— End of inner exception stack trace —
at NServiceBus.DeserializeLogicalMessagesConnector.ExtractWithExceptionHandling(IncomingMessage message) in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 50
at NServiceBus.DeserializeLogicalMessagesConnector.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 29
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.ReceivePerformanceDiagnosticsBehavior.d__2.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Performance\Statistics\ReceivePerformanceDiagnosticsBehavior.cs:line 40
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.MutateIncomingTransportMessageBehavior.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\MessageMutators\MutateTransportMessage\MutateIncomingTransportMessageBehavior.cs:line 43
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.ProcessingStatisticsBehavior.d__0.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Performance\Statistics\ProcessingStatisticsBehavior.cs:line 27
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.TransportReceiveToPhysicalMessageProcessingConnector.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\Incoming\TransportReceiveToPhysicalMessageProcessingConnector.cs:line 37
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.MainPipelineExecutor.d__1.MoveNext() in C:\BuildAgent\work\b549d46003942065\src\NServiceBus.Core\Pipeline\MainPipelineExecutor.cs:line 32
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at NServiceBus.Transport.RabbitMQ.MessagePump.d__29.MoveNext() in C:\BuildAgent\work\a9e6741f41af7061\src\NServiceBus.RabbitMQ\Receiving\MessagePump.cs:line 248