Hi Team,
I am writing the saga which receives the StopSnapshot command sometimes. If it’s received, it has to to wait for the SnapShotCreated event to receive to stop the execution further. In case SnapShotCreated created first then it should continue the execution. When I try to do that I am receiving the below error. Any help is highly appreciated?
Code:
public class SnapshotScheduler : Saga<SnapshotState>, IAmStartedByMessages<SnapShotCreated>,
IAmStartedByMessages<StopSnapshot>
{
private readonly SnapshotOptions _snapshotOptions;
public SnapshotScheduler(IOptions<SnapshotOptions> snapshotOptions, ILogger<SnapshotScheduler> logger)
{
_snapshotOptions = snapshotOptions.Value;
_logger = logger;
}
public async Task Handle(SnapShotCreated message, IMessageHandlerContext context)
{
Data.CorrelationId = message.CorrelationId;
Data.SnapshotPartitionId = message.SnapshotPartitionId;
Data.StartReceived = true;
if (message.IsTest)
{
MarkAsComplete();
return;
}
await ProcessState(context);
}
public Task Handle(StopSnapshot message, IMessageHandlerContext context)
{
_logger.LogInformation("Received StopSnapshot message:{Message}", JsonSerializer.Serialize(message));
Data.StopReceived = true;
return ProcessState(context);
}
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SnapshotState> mapper)
{
mapper.MapSaga(saga => saga.InstanceId)
.ToMessage<SnapShotCreated>(x => x.InstanceId)
.ToMessage<StopSnapshot>(x => x.InstanceId);
}
private async Task ProcessState(IMessageHandlerContext context)
{
if (Data.StartReceived)
{
if (Data.StopReceived)
{
_logger.LogInformation("Stopping the scheduler with data: {@Data}", Data);
}
else
{
var sendOptions = new SendOptions();
sendOptions.DelayDeliveryWith(TimeSpan.FromMinutes(_snapshotOptions.CreateSnapshotDelayInMin));
await context.Send(
new StartSnapshot { SnapshotPartitionId = Data.SnapshotPartitionId, InstanceId = Data.InstanceId, CorrelationId = Data.CorrelationId }, sendOptions);
}
MarkAsComplete();
}
}
}
public class SnapshotState : ContainSagaData
{
public Guid InstanceId { get; set; }
public bool StopReceived { get; set; }
public bool StartReceived { get; set; }
public Guid SnapshotPartitionId { get; set; }
public Guid CorrelationId { get; set; }
}
}
Error message:
info: NServiceBus.ImmediateRetry[0]
Immediate Retry is going to retry message ‘9297b3ad-fd71-4723-bcd3-af6c00206865’ because of an exception:
NServiceBus.Persistence.CosmosDB.TransactionalBatchOperationException: The ‘SnapshotState’ saga with id ‘3d28f72e-ab33-87e9-a65c-a50b6b9043fb’ can’t be completed. Response status code: BadRequest.
at NServiceBus.Persistence.CosmosDB.SagaSave.Conflict(TransactionalBatchOperationResult result) in //src/NServiceBus.Persistence.CosmosDB/Saga/SagaOperations.cs:line 74
at NServiceBus.Persistence.CosmosDB.TransactionalBatchExtensions.ExecuteOperationsAsync[TOperation](TransactionalBatch transactionalBatch, Dictionary2 operationMappings, PartitionKeyPath partitionKeyPath, CancellationToken cancellationToken) in /_/src/NServiceBus.Persistence.CosmosDB/Synchroniz edStorage/TransactionalBatchExtensions.cs:line 59 at NServiceBus.Persistence.CosmosDB.StorageSession.Commit(CancellationToken cancellationToken) in /_/src/NServiceBus.Persistence.CosmosDB/SynchronizedStorage/StorageSession.cs:line 62 at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func
2 stage) in //src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17 at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func
2 stage) in //src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
at ReceivePerformanceDiagnosticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func2 next) in /_/src/NServiceBus.Metrics/ProbeBuilders/ReceivePerformanceDiagnosticsBehavior.cs:line 18 at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func
2 next) in //src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25 at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func
2 next) in //src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25 at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 44 at NServiceBus.Transport.AzureServiceBus.MessagePump.ProcessMessage(ServiceBusReceivedMessage message, ProcessMessageEventArgs processMessageEventArgs, String messageId, Dictionary
2 headers, BinaryData body, CancellationToken messageProcessingCancellationToken) in //src/Transport/Receiving/Mes
sagePump.cs:line 285