I have a Saga which successfully initialises the saga data in MongoDb after receiving a start command. As part of the handler that starts the saga it sends a message (locally) to another handler, which uses a TCP/IP API to communicate with a third-party i.e. sends a message to the third-party over a socket.
A few seconds later a response from the TCP/IP API is received, the content is mapped to an ICommand message and that message is sent locally. This ICommand object is picked up by a Saga Handler on the saga and sets some state fields.
When the Saga Handler finishes processing the response command the following exception is thrown:
System.Collections.Generic.KeyNotFoundException: No item found in behavior context with key: OrderManagement.NewOrderSingleSagaData
at NServiceBus.Extensibility.ContextBag.Get[T](String key)
at NServiceBus.Storage.MongoDB.StorageSession.RetrieveVersion(Type type)
at NServiceBus.Storage.MongoDB.SagaPersister.Update(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context)
at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.ScheduledTaskHandlingBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next)
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext)
at NServiceBus.Transport.SQS.MessagePump.ProcessMessageWithInMemoryRetries(Dictionary`2 headers, String nativeMessageId, Byte[] body, CancellationToken token)
Exception details:
Message ID: a8b2f921-c345-4091-931f-ac0000cf4f4f
Transport message ID: 1dc857d1-9398-44df-9bfc-d25381098e77
Has anyone come across this before, or can you point me in the right direction?
Looking through the NServiceBus.Storage.MongoDB it appears this issue occurs when using IFindSaga to obtain the saga data using different properties.
The NServiceBus.Storage.MongoDB library uses the ContextBag to store the “_version” of the saga record to help with conflicts:
async Task<TSagaData> GetSagaData<TSagaData>(string elementName, object elementValue, SynchronizedStorageSession session)
{
var storageSession = (StorageSession)session;
var document = await storageSession.Find<TSagaData>(new BsonDocument(elementName, BsonValue.Create(elementValue))).ConfigureAwait(false);
if (document != null)
{
var version = document.GetValue(versionElementName);
// @@@ Stores the version in to the ContextBag
storageSession.StoreVersion<TSagaData>(version.AsInt32);
return BsonSerializer.Deserialize<TSagaData>(document);
}
return default;
}
Then when the saga data is updated the version is retrieved from the ContextBag to use when calling the update:
public async Task Update(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context)
{
var storageSession = (StorageSession)session;
var sagaDataType = sagaData.GetType();
// @@@ Get the version of the saga data from the context
var version = storageSession.RetrieveVersion(sagaDataType);
var document = sagaData.ToBsonDocument().SetElement(new BsonElement(versionElementName, version + 1));
// @@@ Use the version when doing the update to ensure there hasn't been a subsequent update
var result = await storageSession.ReplaceOneAsync(sagaDataType, filterBuilder.Eq(idElementName, sagaData.Id) & filterBuilder.Eq(versionElementName, version), document).ConfigureAwait(false);
if (result.ModifiedCount != 1)
{
throw new Exception($"The '{sagaDataType.Name}' saga with id '{sagaData.Id}' was updated by another process or no longer exists.");
}
}
When using a custom finder, as the NServiceBus.Storage.MongoDB.StorageSession is inaccessible we can’t cast SynchronizedStorageSession to call the StoreVersion();
public async Task<NewOrderSingleSagaData> FindBy(SendOrderRejectedResponse message, SynchronizedStorageSession storageSession, ReadOnlyContextBag context){
var session = storageSession.GetClientSession();
var sagaDataCollection = session.Client
.GetServiceDatabase()
.GetSagaCollection<NewOrderSingleSagaData>();
var newOrderSingleSagaDataFilter = Builders<NewOrderSingleSagaData>
.Filter.Eq( saga => saga.SagaReference, message.RequestorReference );
var newOrderSingleSagaDataCursor = await sagaDataCollection.FindAsync(newOrderSingleSagaDataFilter);
var newOrderSingleSagaData = await newOrderSingleSagaDataCursor.FirstOrDefaultAsync();
return newOrderSingleSagaData;
}
So, is there a way of adding data to the ContextBag? Or a way of getting at the StoreVersion?
Yes, because we are “safe by default” we didn’t provide access to StoreVersion or the ContextBag.
The persistence was released as feature minimal as possible, hoping good folks like you would point out what we are potentially missing.
Would you be willing to explain why you are using a custom saga finder and perhaps even share it? This would help us understand and evaluate your use case and figure out the best way forward to help you.
If you cannot share your code or scenario publicly (which is understandable) but are still willing to do so privately, please send us an email with the details to support@particular.net.
Thanks for getting back to me, and completely understand the approach.
I can’t share the code but I can share the scenario, and I’ll see if I can get a reduced PoC sent over.
The scenario is, we have a service that needs to track the status of requests to a third party, using their API, where they have a limit of 16 characters for the references but we’re using GUIDs internally.
The service starts a saga after receiving a command that contains the internal GUID reference. This GUI reference is used as the unique key for the saga to minimise the chances of duplicates. As part of processing this message we generate a compatible external reference which is sent to the third party over TCP/IP. The third party will process our request and send back a response (at some point in the future) which contains the compatible reference. We map this response back to a command and send it to the saga for processing. However, as we can’t search for a different field on the Saga we needed the custom IFindSaga code.
If you implement the saga finder you need to add it to the context bag yourself. Have you tried something like this?
public class MySagaFinder :
IFindSagas<MySagaData>.Using<MyMessage>
{
public Task<MySagaData> FindBy(MyMessage message, SynchronizedStorageSession storageSession, ReadOnlyContextBag context)
{
// load the document
var version = document.GetValue(versionElementName); // by default _version unless overwritten
contextBag.Set(typeof(MySagaData).FullName, version);
}
}
Thanks for this, no I’ve not tried it but will give it a go when I can.
Unfortunately, I might have to scrap using the MongoDb persistence as I was hoping to use AWS DocumentDb but it doesn’t appear to be compatible due to AWS DocumentDb not supporting Sessions.