Nservicebus questions

Some questions regarding nservicebus, sagas, throttling, rabbitmq transports, and resilience.

  1. What happened to nservicebus.transports.rabbitmq not in nuget anymore.
  2. I implemented throttling (Message Throughput Throttling • NServiceBus Samples • Particular Docs) as described.
    This works when i throw multiple job request messages up on the queue however now i cant interrupt a job to cancel.
    Is there any way to interrupt the saga with a high priority message?
    Any thoughts?
  3. I implemented the job handler in the same process as my saga. Is this the right way to do this ?
  4. How do you scale workers? Should the Saga be scaled 1:1 with a worker?
  5. Part of my POC is to prove that if a handler goes down, the message will be picked up by another process when available.
    So during my demo I will kill a process and start a process to prove resilience. With limited testing, this seems to be working. I implemented the mongoDB persistence in order to do this.
    Is this the best way to do this?

Just in case, here is my configuration for the Saga and Worker process:
namespace Tamaki.Fibonacci
{
class Program
{
static async Task Main(string[] args)
{
Console.Title = $“Tamaki.Fibonacci {Guid.NewGuid().ToString()}”;

        var endpointConfiguration = new EndpointConfiguration("Tamaki.Fibonacci");
        endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);
        endpointConfiguration.Pipeline.Register<ThrottlingRegistration>();

        endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
        var externalNewtonsoftJson = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
        externalNewtonsoftJson.ContentTypeKey("NewtonsoftJson");

        endpointConfiguration.RegisterComponents(
            registration: components =>
                          {
                              components.ConfigureComponent<IncomingMessageBodyWriter>(DependencyLifecycle.InstancePerCall);
                          });

        TransportExtensions<RabbitMQTransport> transport = endpointConfiguration.UseTransport<RabbitMQTransport>();

        transport.ConnectionString("host=localhost");

        transport.Routing().RouteToEndpoint(typeof(CreateSequence), "Tamaki.Fibonacci");

        endpointConfiguration.SendFailedMessagesTo("error-fibonacci");
        endpointConfiguration.EnableInstallers();
        var persistence = endpointConfiguration.UsePersistence<MongoDbPersistence>();
        persistence.SetConnectionString("mongodb://localhost:27017/Tamaki");

        IEndpointInstance endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
        await endpointInstance.Stop().ConfigureAwait(false);
    }
}

}

And here is my Saga:
namespace Tamaki.Fibonacci
{
public class ProcessRequestSaga : Saga<ProcessRequestSaga.JobRequestData>,
IAmStartedByMessages,
IAmStartedByMessages,
IHandleMessages
{
static ILog log = LogManager.GetLogger();

    public async Task Handle(StartJob message, IMessageHandlerContext context)
    {
        Data.JobRequestId = message.JobRequestId;
        Data.InstanceId = message.InstanceId;

        await context.SendLocal(
            new CreateSequence()
            {
                InstanceId = message.InstanceId,
                JobRequestId = message.JobRequestId
            }).ConfigureAwait(false);


        log.Info($"JobRequest #{message.JobRequestId} complete.");
    }

    public Task Handle(CancelJob message, IMessageHandlerContext context)
    {
        Data.JobRequestId = message.JobRequestId;
        Data.InstanceId = message.InstanceId;

        log.Info($"JobRequest #{message.JobRequestId} was cancelled.");
        var jobRequestCancelled = new JobRequestCancelled
        {
            JobRequestId = message.JobRequestId,
            InstanceId = message.InstanceId
        };

        context.Publish(jobRequestCancelled);
        return Task.CompletedTask;
    }

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<JobRequestData> mapper)
    {
        mapper.ConfigureMapping<StartJob>(message => message.JobRequestId)
              .ToSaga(sagaData => sagaData.JobRequestId);
        mapper.ConfigureMapping<CancelJob>(message => message.JobRequestId)
              .ToSaga(sagaData => sagaData.JobRequestId);
    }

    public async Task Handle(CreateSequenceReply message, IMessageHandlerContext context)
    {
        await context.Publish(
            new JobRequestComplete()
            {
                InstanceId = message.InstanceId,
                JobRequestId = message.JobRequestId
            }).ConfigureAwait(false);
    }

    public class JobRequestData : ContainSagaData
    {
        public int JobRequestId { get; set; }
        public int InstanceId { get; set; }

        [DocumentVersion]
        public int Version { get; set; }
    }

    public class SagaNotFoundHandler : IHandleSagaNotFound
    {
        public Task Handle(object message, IMessageProcessingContext context)
        {
            var response = message as CreateSequenceReply;
            if (response != null)
            {
                log.Info($"Saga not found #{response.JobRequestId}.");
            }

            var sagaDisappearedMessage = new SagaDisappearedMessage();
            return context.Reply(sagaDisappearedMessage);
        }
    }

    public class SagaDisappearedMessage
    {
    }
}

}

And here is my worker:

namespace Tamaki.Fibonacci
{
class FibonacciHandler : IHandleMessages
{

    static ILog log = LogManager.GetLogger<FibonacciHandler>();

    public async Task Handle(CreateSequence message, IMessageHandlerContext context)
    {
        int a = 0;
        int b = 1;
        int n = message.JobRequestId;

        int i = 0;

        while (i < n )
        {
            int temp = a;
            a = b;
            b = temp + b;
            i++;
            log.Info($"Fibonacci sequence {b}");
            Thread.Sleep(1000);
        }
        
        log.Info($"Received Job Request #{message.JobRequestId} for instance #{message.InstanceId}.");
        await context.Reply(
            new CreateSequenceReply()
            {
                InstanceId = message.InstanceId,
                JobRequestId = message.JobRequestId,
                Result = a
            });
    }
}

}

Hi Joe,

The official package is NuGet Gallery | NServiceBus.RabbitMQ 8.0.1. And this is still available on nuget. Not sure what you are referring to. Can you elaborate?

I do not fully understand yet why throttling is needed in your example. Can you describe the business scenario that makes you think you need throttling as described in the sample?

In your example, by putting the work handler and the saga in the same endpoint and limiting the concurrency to 1, the endpoint will only ever process one message at a time. The implications of that are that while the worker is processing the endpoint will not be able to process cancel commands and dispatch them to the saga.

So if the worker process needs some kind of throttling, then I would suggest you to do the following. Have a dedicated endpoint that manages the state of the work with the saga. No concurrency limitation should be applied there since you always want to be able to receive commands that are correlated to a given saga instance.

Move the worker into a dedicated endpoint. The endpoint can be throughput and/or concurrency limited if required.

With that approach a saga can always store the progress of the work and if cancelation is received stop continuing further in the process. There is one caveat though. The handler work that is ongoing cannot be “canceled” easily. If you want to manage that you’d need some kind of cancellation service that is shared on worker endpoint and have a concurrency greater than one. When the cancelation request is sent to the endpoint, another handler would then indicate on the cancellation service that a request has been canceled. The handler processing that request would check that cancelation service or token where feasible and cancel the operation if indicated to do so. The problem is that if the shared knowledge for that cancelation is done “in-memory”, then the worker endpoint cannot be scaled out with competing consumers.

Regards
Daniel

Hi Daniel, Thanks again for your time.
My design’s underlying goal is to build a very simple, scalable solution that leverages 1 to many single threaded workers that i can spawn as the load increases. Each worker would be competing for job messages on the queue, As the queue increased with job messages, I can add more workers across my farm to support the load.
Additionally, each worker would want to have its own process. By doing this, if 1 job results in an unhandled exception, another job running in the same process would not be adversely affected. There would also be the benefit of avoiding resource contention within the process address space. In some cases, the job will be managing large chunks of data within its address space.
Without throttling, I am assuming that multiple jobs would be running within 1 process.

Based on this description, do I still want to separate the saga from the job request handler? And if i do separate the saga from the job request handler, does my saga to job handler ratio go from 1:1 to 1:n where n is the number of workers that i spawn as the load increases?

Thanks again for discussing this approach,
joe

Hi Joe,

In your case, I would recommend having an orchestrator endpoint that owns the saga for the orchestration process and sends messages to a worker endpoint with a concurrency limit of one. The restriction you have to be single threaded is a restriction that is imposed on the worker and not on the orchestrator. Therefore this requires a dedicated endpoint that can be scaled out and do the competing consumer pattern on the queue. Another good thing about this is that the worker node doesn’t need saga persistence.

Not sure about the question with the ratio. The picture would look like the following:

I hope that clarifies things.

Regards
Daniel

So from this design, you are suggesting one instance of the Saga for multiple instances of workers. That seems to require a more complex WorkCoordinator. I also see the potential Single Point of Failure if the WorkCoordinator goes down.
What if I were to have an instance of a Saga per Worker? A 1:1 ratio of Sagas to Workers. That way if one worker runs into trouble it doesnt affect the rest of the workers.
Thanks again
Joe

Hi Joe,

The WorkerCoordinator can be easily scaled out if required. You can have as many workers as you want and a few worker coordinators if required. The worker coordinator can do competing consumer on is coordinator queue.

Happy to discuss this through in a call if required. Send me an email to daniel dot marbach at particular dot net and we can setup a call if things are still unclear.

Regards
Daniel