NServiceBus.Persistence.CosmosDB.TransactionalBatchOperationException: The saga with id can't be completed. Response status code: BadRequest

Hi Team,

I am writing the saga which receives the StopSnapshot command sometimes. If it’s received, it has to to wait for the SnapShotCreated event to receive to stop the execution further. In case SnapShotCreated created first then it should continue the execution. When I try to do that I am receiving the below error. Any help is highly appreciated?

Code:

public class SnapshotScheduler : Saga<SnapshotState>, IAmStartedByMessages<SnapShotCreated>,
    IAmStartedByMessages<StopSnapshot>
{
    private readonly SnapshotOptions _snapshotOptions;


    public SnapshotScheduler(IOptions<SnapshotOptions> snapshotOptions, ILogger<SnapshotScheduler> logger)
    {
        _snapshotOptions = snapshotOptions.Value;
        _logger = logger;
    }

    public async Task Handle(SnapShotCreated message, IMessageHandlerContext context)
    {
        Data.CorrelationId = message.CorrelationId;
        Data.SnapshotPartitionId = message.SnapshotPartitionId;
        Data.StartReceived = true;
        if (message.IsTest)
        {
            MarkAsComplete();
            return;
        }

        await ProcessState(context);
    }

    public Task Handle(StopSnapshot message, IMessageHandlerContext context)
    {
        _logger.LogInformation("Received StopSnapshot message:{Message}", JsonSerializer.Serialize(message));
        Data.StopReceived = true;
        return ProcessState(context);
    }

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SnapshotState> mapper)
    {
        mapper.MapSaga(saga => saga.InstanceId)
            .ToMessage<SnapShotCreated>(x => x.InstanceId)
            .ToMessage<StopSnapshot>(x => x.InstanceId);
    }

    private async Task ProcessState(IMessageHandlerContext context)
    {
        if (Data.StartReceived)
        {
            if (Data.StopReceived)
            {
                _logger.LogInformation("Stopping the scheduler with data: {@Data}", Data);
            }
            else
            {
                var sendOptions = new SendOptions();
                sendOptions.DelayDeliveryWith(TimeSpan.FromMinutes(_snapshotOptions.CreateSnapshotDelayInMin));
                await context.Send(
                    new StartSnapshot { SnapshotPartitionId = Data.SnapshotPartitionId, InstanceId = Data.InstanceId, CorrelationId = Data.CorrelationId }, sendOptions);
            }
            MarkAsComplete();
        }
    }
}

public class SnapshotState : ContainSagaData
{
    public Guid InstanceId { get; set; }
    public bool StopReceived { get; set; }
    public bool StartReceived { get; set; }
    public Guid SnapshotPartitionId { get; set; }
    public Guid CorrelationId { get; set; }
}

}

Error message:

info: NServiceBus.ImmediateRetry[0]
Immediate Retry is going to retry message ‘9297b3ad-fd71-4723-bcd3-af6c00206865’ because of an exception:
NServiceBus.Persistence.CosmosDB.TransactionalBatchOperationException: The ‘SnapshotState’ saga with id ‘3d28f72e-ab33-87e9-a65c-a50b6b9043fb’ can’t be completed. Response status code: BadRequest.
at NServiceBus.Persistence.CosmosDB.SagaSave.Conflict(TransactionalBatchOperationResult result) in //src/NServiceBus.Persistence.CosmosDB/Saga/SagaOperations.cs:line 74
at NServiceBus.Persistence.CosmosDB.TransactionalBatchExtensions.ExecuteOperationsAsync[TOperation](TransactionalBatch transactionalBatch, Dictionary2 operationMappings, PartitionKeyPath partitionKeyPath, CancellationToken cancellationToken) in /_/src/NServiceBus.Persistence.CosmosDB/Synchroniz edStorage/TransactionalBatchExtensions.cs:line 59 at NServiceBus.Persistence.CosmosDB.StorageSession.Commit(CancellationToken cancellationToken) in /_/src/NServiceBus.Persistence.CosmosDB/SynchronizedStorage/StorageSession.cs:line 62 at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func2 stage) in /
/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17 at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func2 stage) in //src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
at ReceivePerformanceDiagnosticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func2 next) in /_/src/NServiceBus.Metrics/ProbeBuilders/ReceivePerformanceDiagnosticsBehavior.cs:line 18 at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func2 next) in /
/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25 at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func2 next) in //src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25 at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 44 at NServiceBus.Transport.AzureServiceBus.MessagePump.ProcessMessage(ServiceBusReceivedMessage message, ProcessMessageEventArgs processMessageEventArgs, String messageId, Dictionary2 headers, BinaryData body, CancellationToken messageProcessingCancellationToken) in /
/src/Transport/Receiving/Mes
sagePump.cs:line 285

Hi Pon,

What version of NServiceBus and the CosmosDB persistence are you using? Have you made sure the partition key is mapped? Can you maybe also share the mapping code?

Thanks
Daniel

Thanks Daniel. This is issue is due the sharing one container for two endpoints. I am able resolve it by making one container for each endpoints.

Thanks, Pon

Hi Pon,

Not sure, I follow. It should be possible to have multiple endpoints using the same container. As long as you use unique partition key and identifier pairs, things can be placed into the same container. That should not cause any issues.

Can you maybe describe a bit more your scenario? Do both endpoints have the same saga? Is this a scaled out endpoint or two different ones?

Regards,
Daniel

Let me find the old sample and share it with you. Sorry for the delayed response.

Thanks, Pon