Hook into pipeline for conversation id logging


(Philipp Dolder) #1

I want to log the ConversationId with every log message that is written within this conversation to be able to group everything together.

We’re using log4net for logging. I assume the solution will be similar for other logging libraries.
I’m setting a property on the logical thread context to capture the conversation id like so

log4net.LogicalThreadContext.Properties["ConversationId"] = conversationId;

Now my goal is to hook into the pipeline as soon as possible, so that messages logged by NServiceBus also get the conversation id assigned and end up having the conversation id as a value in the log message.

I know of the Mutators and Behaviors to hook into the pipeline. I’m just not sure where the best place is to hook into. I also didn’t see a way for me to tell NServiceBus "Hey, I want this behavior to be the first in the pipeline (e.g. for IncomingPhysicalMessage). I know that Mutators don’t have a particular order, so I imagein it would be better to use a Behavior for this.

I came up with this code

public class LogConversationIdBehavior : Behavior<IIncomingPhysicalMessageContext>
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
        string conversationId = context.MessageHeaders[Headers.ConversationId];
        log4net.LogicalThreadContext.Properties["ConversationId"] = conversationId;
        return next();


endpointConfiguration.Pipeline.Register(new LogConversationIdBehavior(), "Adds the conversation id to the logging context");

to register it.

In the docs I understand that ITransportReceiveContext is coming even earlier in the pipeline, but due to its purpose I’m very hesitant in implementing it as a behavior in this stage.

I also read about how this could be done with Serilog https://docs.particular.net/nservicebus/logging/serilog. But as I understand this approach is specific to Serilog.

My questions:

  1. According to the documentation every message has the Headers.ConversationId header set, so I can safely access the key in the dictionary without checking, right?
  2. Where is the best (intended) way to hook into the incoming pipeline?
  3. I don’t need to hook into the outgoing pipeline because everything is happening inside the same logical thread context? Unless I manually add some parallel processing of course.

(Mike Minutillo) #2
  1. You should not rely on the ConversationId header being set. It’s possible that some other external integration point could be set up to send messages to this endpoint and you probably do not want the message to fail processing if this header is not present, which is what will happen if your behavior throws an exception. This will trigger immediate and delayed retries until finally the message ends up in the error queue. Depending on your situation, this may be appropriate and desired. I leave that up to you.
  2. I can’t think of a good reason that you shouldn’t set up in the ITransportReceiveContext. It has the IncomingMessage, including the headers, so if you want to look for conversation id here, you can. This will guarantee that your behavior executes before the behaviors in the Incoming Logical/Physical Message contexts. The best way to set up your behavior is with a feature. If you create a feature and enable it by default, then it will get installed as long as the type scanner picks it up.
  3. You are correct. As long as you are awaiting outgoing sends, publishes, and replies properly (which you need to anyway) then the context should flow through from the incoming message. If you do anything from a background thread then you are no longer in the same context, so there is no incoming message to deal with anyway.

Let me know if you have any more questions.

(Ramon Smits) #3

You probably would want to log the message identifier too:

I also once wrote a similar extension for NLog: