Some questions regarding nservicebus, sagas, throttling, rabbitmq transports, and resilience.
- What happened to nservicebus.transports.rabbitmq not in nuget anymore.
- I implemented throttling (Message Throughput Throttling • NServiceBus Samples • Particular Docs) as described.
This works when i throw multiple job request messages up on the queue however now i cant interrupt a job to cancel.
Is there any way to interrupt the saga with a high priority message?
Any thoughts? - I implemented the job handler in the same process as my saga. Is this the right way to do this ?
- How do you scale workers? Should the Saga be scaled 1:1 with a worker?
- Part of my POC is to prove that if a handler goes down, the message will be picked up by another process when available.
So during my demo I will kill a process and start a process to prove resilience. With limited testing, this seems to be working. I implemented the mongoDB persistence in order to do this.
Is this the best way to do this?
Just in case, here is my configuration for the Saga and Worker process:
namespace Tamaki.Fibonacci
{
class Program
{
static async Task Main(string[] args)
{
Console.Title = $“Tamaki.Fibonacci {Guid.NewGuid().ToString()}”;
var endpointConfiguration = new EndpointConfiguration("Tamaki.Fibonacci");
endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);
endpointConfiguration.Pipeline.Register<ThrottlingRegistration>();
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
var externalNewtonsoftJson = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
externalNewtonsoftJson.ContentTypeKey("NewtonsoftJson");
endpointConfiguration.RegisterComponents(
registration: components =>
{
components.ConfigureComponent<IncomingMessageBodyWriter>(DependencyLifecycle.InstancePerCall);
});
TransportExtensions<RabbitMQTransport> transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=localhost");
transport.Routing().RouteToEndpoint(typeof(CreateSequence), "Tamaki.Fibonacci");
endpointConfiguration.SendFailedMessagesTo("error-fibonacci");
endpointConfiguration.EnableInstallers();
var persistence = endpointConfiguration.UsePersistence<MongoDbPersistence>();
persistence.SetConnectionString("mongodb://localhost:27017/Tamaki");
IEndpointInstance endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
Console.WriteLine("Press any key to exit");
Console.ReadKey();
await endpointInstance.Stop().ConfigureAwait(false);
}
}
}
And here is my Saga:
namespace Tamaki.Fibonacci
{
public class ProcessRequestSaga : Saga<ProcessRequestSaga.JobRequestData>,
IAmStartedByMessages,
IAmStartedByMessages,
IHandleMessages
{
static ILog log = LogManager.GetLogger();
public async Task Handle(StartJob message, IMessageHandlerContext context)
{
Data.JobRequestId = message.JobRequestId;
Data.InstanceId = message.InstanceId;
await context.SendLocal(
new CreateSequence()
{
InstanceId = message.InstanceId,
JobRequestId = message.JobRequestId
}).ConfigureAwait(false);
log.Info($"JobRequest #{message.JobRequestId} complete.");
}
public Task Handle(CancelJob message, IMessageHandlerContext context)
{
Data.JobRequestId = message.JobRequestId;
Data.InstanceId = message.InstanceId;
log.Info($"JobRequest #{message.JobRequestId} was cancelled.");
var jobRequestCancelled = new JobRequestCancelled
{
JobRequestId = message.JobRequestId,
InstanceId = message.InstanceId
};
context.Publish(jobRequestCancelled);
return Task.CompletedTask;
}
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<JobRequestData> mapper)
{
mapper.ConfigureMapping<StartJob>(message => message.JobRequestId)
.ToSaga(sagaData => sagaData.JobRequestId);
mapper.ConfigureMapping<CancelJob>(message => message.JobRequestId)
.ToSaga(sagaData => sagaData.JobRequestId);
}
public async Task Handle(CreateSequenceReply message, IMessageHandlerContext context)
{
await context.Publish(
new JobRequestComplete()
{
InstanceId = message.InstanceId,
JobRequestId = message.JobRequestId
}).ConfigureAwait(false);
}
public class JobRequestData : ContainSagaData
{
public int JobRequestId { get; set; }
public int InstanceId { get; set; }
[DocumentVersion]
public int Version { get; set; }
}
public class SagaNotFoundHandler : IHandleSagaNotFound
{
public Task Handle(object message, IMessageProcessingContext context)
{
var response = message as CreateSequenceReply;
if (response != null)
{
log.Info($"Saga not found #{response.JobRequestId}.");
}
var sagaDisappearedMessage = new SagaDisappearedMessage();
return context.Reply(sagaDisappearedMessage);
}
}
public class SagaDisappearedMessage
{
}
}
}
And here is my worker:
namespace Tamaki.Fibonacci
{
class FibonacciHandler : IHandleMessages
{
static ILog log = LogManager.GetLogger<FibonacciHandler>();
public async Task Handle(CreateSequence message, IMessageHandlerContext context)
{
int a = 0;
int b = 1;
int n = message.JobRequestId;
int i = 0;
while (i < n )
{
int temp = a;
a = b;
b = temp + b;
i++;
log.Info($"Fibonacci sequence {b}");
Thread.Sleep(1000);
}
log.Info($"Received Job Request #{message.JobRequestId} for instance #{message.InstanceId}.");
await context.Reply(
new CreateSequenceReply()
{
InstanceId = message.InstanceId,
JobRequestId = message.JobRequestId,
Result = a
});
}
}
}