MongoDb, SQS, No item found in behavior context with key

Hey all,

Bit of a shout out as I’m banging my head against a wall.

NServiceBus: 7.3.0
NServiceBus.AmazonSQS: 5.0.0
NServiceBus.Extensions.Hosting: 1.0.1
NServiceBus.Storage.MongoDB: 2.2.0

I have a Saga which successfully initialises the saga data in MongoDb after receiving a start command. As part of the handler that starts the saga it sends a message (locally) to another handler, which uses a TCP/IP API to communicate with a third-party i.e. sends a message to the third-party over a socket.

A few seconds later a response from the TCP/IP API is received, the content is mapped to an ICommand message and that message is sent locally. This ICommand object is picked up by a Saga Handler on the saga and sets some state fields.

When the Saga Handler finishes processing the response command the following exception is thrown:

System.Collections.Generic.KeyNotFoundException: No item found in behavior context with key: OrderManagement.NewOrderSingleSagaData
   at NServiceBus.Extensibility.ContextBag.Get[T](String key)
   at NServiceBus.Storage.MongoDB.StorageSession.RetrieveVersion(Type type)
   at NServiceBus.Storage.MongoDB.SagaPersister.Update(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context)
   at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.ScheduledTaskHandlingBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next)
   at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext)
   at NServiceBus.Transport.SQS.MessagePump.ProcessMessageWithInMemoryRetries(Dictionary`2 headers, String nativeMessageId, Byte[] body, CancellationToken token)
Exception details:
        Message ID: a8b2f921-c345-4091-931f-ac0000cf4f4f
        Transport message ID: 1dc857d1-9398-44df-9bfc-d25381098e77

Has anyone come across this before, or can you point me in the right direction?

Cheers,
Marcus

Some notes for the investigation:

Looking through the NServiceBus.Storage.MongoDB it appears this issue occurs when using IFindSaga to obtain the saga data using different properties.

The NServiceBus.Storage.MongoDB library uses the ContextBag to store the “_version” of the saga record to help with conflicts:

     async Task<TSagaData> GetSagaData<TSagaData>(string elementName, object elementValue, SynchronizedStorageSession session)
    {
        var storageSession = (StorageSession)session;

        var document = await storageSession.Find<TSagaData>(new BsonDocument(elementName, BsonValue.Create(elementValue))).ConfigureAwait(false);

        if (document != null)
        {
            var version = document.GetValue(versionElementName);

            // @@@ Stores the version in to the ContextBag

            storageSession.StoreVersion<TSagaData>(version.AsInt32);

            return BsonSerializer.Deserialize<TSagaData>(document);
        }

        return default;
    }

Then when the saga data is updated the version is retrieved from the ContextBag to use when calling the update:

    public async Task Update(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context)
    {
        var storageSession = (StorageSession)session;
        var sagaDataType = sagaData.GetType();

        // @@@ Get the version of the saga data from the context
        var version = storageSession.RetrieveVersion(sagaDataType);
        var document = sagaData.ToBsonDocument().SetElement(new BsonElement(versionElementName, version + 1));

        // @@@ Use the version when doing the update to ensure there hasn't been a subsequent update

        var result = await storageSession.ReplaceOneAsync(sagaDataType, filterBuilder.Eq(idElementName, sagaData.Id) & filterBuilder.Eq(versionElementName, version), document).ConfigureAwait(false);

        if (result.ModifiedCount != 1)
        {
            throw new Exception($"The '{sagaDataType.Name}' saga with id '{sagaData.Id}' was updated by another process or no longer exists.");
        }
    }

When using a custom finder, as the NServiceBus.Storage.MongoDB.StorageSession is inaccessible we can’t cast SynchronizedStorageSession to call the StoreVersion();

    public async Task<NewOrderSingleSagaData> FindBy(SendOrderRejectedResponse message, SynchronizedStorageSession storageSession, ReadOnlyContextBag context){
        var session = storageSession.GetClientSession();
        var sagaDataCollection = session.Client
            .GetServiceDatabase()
            .GetSagaCollection<NewOrderSingleSagaData>();
        var newOrderSingleSagaDataFilter = Builders<NewOrderSingleSagaData>
            .Filter.Eq( saga => saga.SagaReference, message.RequestorReference );
        var newOrderSingleSagaDataCursor = await sagaDataCollection.FindAsync(newOrderSingleSagaDataFilter);
        var newOrderSingleSagaData = await newOrderSingleSagaDataCursor.FirstOrDefaultAsync();

        return newOrderSingleSagaData;
    }

So, is there a way of adding data to the ContextBag? Or a way of getting at the StoreVersion?

… digging continues :smiley:

Marcus:

Yes, because we are “safe by default” we didn’t provide access to StoreVersion or the ContextBag.

The persistence was released as feature minimal as possible, hoping good folks like you would point out what we are potentially missing.

Would you be willing to explain why you are using a custom saga finder and perhaps even share it? This would help us understand and evaluate your use case and figure out the best way forward to help you.

If you cannot share your code or scenario publicly (which is understandable) but are still willing to do so privately, please send us an email with the details to support@particular.net.

Hey Bob,

Thanks for getting back to me, and completely understand the approach.

I can’t share the code but I can share the scenario, and I’ll see if I can get a reduced PoC sent over.

The scenario is, we have a service that needs to track the status of requests to a third party, using their API, where they have a limit of 16 characters for the references but we’re using GUIDs internally.

The service starts a saga after receiving a command that contains the internal GUID reference. This GUI reference is used as the unique key for the saga to minimise the chances of duplicates. As part of processing this message we generate a compatible external reference which is sent to the third party over TCP/IP. The third party will process our request and send back a response (at some point in the future) which contains the compatible reference. We map this response back to a command and send it to the saga for processing. However, as we can’t search for a different field on the Saga we needed the custom IFindSaga code.

Hope that makes sense.

Cheers,
Marcus

Hi Marcus

If you implement the saga finder you need to add it to the context bag yourself. Have you tried something like this?



public class MySagaFinder :
    IFindSagas<MySagaData>.Using<MyMessage>
{
    public Task<MySagaData> FindBy(MyMessage message, SynchronizedStorageSession storageSession, ReadOnlyContextBag context)
    {
       // load the document
       var version = document.GetValue(versionElementName); // by default _version unless overwritten
       contextBag.Set(typeof(MySagaData).FullName, version);
    }
}

Regards
Daniel

Hey Daniel,

Thanks for this, no I’ve not tried it but will give it a go when I can.

Unfortunately, I might have to scrap using the MongoDb persistence as I was hoping to use AWS DocumentDb but it doesn’t appear to be compatible due to AWS DocumentDb not supporting Sessions.

If I do get chance, I’ll feedback here.

Cheers,
Marcus