Help needed in implementing AzureStorageSagaPersistence to store data in Azure Storage Table

I am using NServiceBus 5.x. I am using AzureStorageSagaPersistence to store the data in Azure Storage Table. Table is created but, data is not stored in Azure Storage Table. Below is the sample code.

        var endPointName = ConfigurationManager.AppSettings["EndpointName"];
        configuration.EndpointName(endPointName);
        configuration.OverrideLocalAddress(endPointName);
    configuration.EnableFeature<AzureStorageSagaPersistence>();
        configuration.EnableFeature<AzureStorageSubscriptionPersistence>();
        configuration.EnableFeature<AzureStorageTimeoutPersistence>();
    var persistence = configuration.UsePersistence<AzureStoragePersistence, StorageType.Sagas>();
        persistence.ConnectionString("connectionstring");
        persistence.CreateSchema(true);

Hi, you don’t need these lines

configuration.EnableFeature<AzureStorageSagaPersistence>();
configuration.EnableFeature<AzureStorageSubscriptionPersistence>();
configuration.EnableFeature<AzureStorageTimeoutPersistence>();

Because setting the following line already enables all properly

endpointConfiguration.UsePersistence<AzureStoragePersistence>();

Are you 100% sure the messages you send are arriving in the saga? Did you debug them and checked if the code is executed?

Can you post your routing configuration, perhaps message conventions and your saga? You can leave out specific details of the saga, but I’d like to see which messages it handles, etc. Verify if routing is set up correctly, etc.

Hi,

If I am using configuration.EnableFeature();, then void Handle(UploadImageMessage message) is not hit at debug point. Even sagadata is not inserted in Azure Table.

Below are code.

class InventoryImageImportSaga : 
        Saga<InventoryImageSagaData>,
        IAmStartedByMessages<StartImportInventoryImagesMessage>,
        IHandleMessages<UploadImageMessage>, 
        IHandleTimeouts<CancelImageUploadProcessMessage>
    {
        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<InventoryImageSagaData> mapper)
        {
            mapper.ConfigureMapping<UploadImageMessage>(message => $"{message.InventoryId}_{message.SecondaryKey}")
                .ToSaga(sagaData => sagaData.InventorySagaId);
        }

        public void Handle(StartImportInventoryImagesMessage message)
        {
            this.Data.ImportImageUrls = message.ImportImageUrls;
            RequestTimeout<CancelImageUploadProcessMessage>(TimeSpan.FromMinutes(1));
            var response = new UploadImageMessage
            {
                InventoryId = message.InventoryId,
                SecondaryKey = secondaryKey,
                RetryCount = 3
            };
            Bus.Send(Bus.CurrentMessageContext.ReplyToAddress, Bus.CurrentMessageContext.Id, response);
        }
        public void Handle(UploadImageMessage message)
        {
            MarkAsComplete();
        }


        public void Timeout(CancelImageUploadProcessMessage state)
        {
            MarkAsComplete();
        }
    }

Chetan,

I couldn’t find the definitions of IAmStartedByMessages, IHandleMessages, and IHandleTimeouts. Looking at the code, it seems like you handle StartImportInventoryImagesMessage which immediately sends UploadImageMessage which in turn marks saga as completed, despite your timeout request.

Could you possible describe what are you trying to achieve from the logical stand point? I suspect what could happen is that your saga completed right after it gets StartImportInventoryImagesMessage and thus there’s not data in the storage. You could verify that by running locally setting a breakpoint in Handle(UploadImageMessage message).

Sean

Hi,

Below are my codes. Please let me know where I am making mistakes, SagaTable is created in Azure, But SagaData is not getting inserted.

 public class EndpointConfig : IConfigureThisEndpoint, AsA_Server
    {
        public void Customize(BusConfiguration configuration)
        {
            NServiceBus.Logging.LogManager.Use<NLogFactory>();

            var endPointName = ConfigurationManager.AppSettings["EndpointName"];
            configuration.EndpointName(endPointName);
            configuration.OverrideLocalAddress(endPointName);

            configuration.AssembliesToScan(AllAssemblies.Matching("NServiceBus").And("Carzing.Image.NSBSaga"));
            configuration.DoNotCreateQueues();
            configuration.UseTransport<AzureStorageQueueTransport>();

            configuration.UseSerialization<JsonSerializer>();

            configuration.EnableFeature<AzureStorageSagaPersistence>();
            configuration.EnableFeature<AzureStorageSubscriptionPersistence>();
            configuration.EnableFeature<AzureStorageTimeoutPersistence>();

            var persistence = configuration.UsePersistence<AzureStoragePersistence, StorageType.Sagas>();
            persistence.ConnectionString("ConnectionString");
            persistence.CreateSchema(true);
			
			 var persistence1 = configuration.UsePersistence<AzureStoragePersistence, StorageType.Subscriptions>();
            persistence1.ConnectionString("ConnectionString");
            persistence1.TableName("Subscription");
            persistence1.CreateSchema(true);

            var persistence2 = configuration.UsePersistence<AzureStoragePersistence, StorageType.Timeouts>();
            persistence2.ConnectionString("ConnectionString");
            persistence2.CreateSchema(true);
            persistence2.TimeoutManagerDataTableName("TimeoutManagerDataTable2");
            persistence2.TimeoutDataTableName("TimeoutDataTableName2");
            persistence2.CatchUpInterval(10);
            persistence2.PartitionKeyScope("yyyy-MM-dd-HH");
        }
    }

class InventoryImageImportSaga : 
        Saga<InventoryImageSagaData>,
        IAmStartedByMessages<StartImportInventoryImagesMessage>,
        IHandleMessages<UploadImageMessage>, 
        IHandleTimeouts<CancelImageUploadProcessMessage>
    {
        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<InventoryImageSagaData> mapper)
        {
            mapper.ConfigureMapping<UploadImageMessage>(message => $"{message.InventoryId}_{message.SecondaryKey}")
                .ToSaga(sagaData => sagaData.InventorySagaId);
        }

        public void Handle(StartImportInventoryImagesMessage message)
        {
            this.Data.ImportImageUrls = message.ImportImageUrls;
            RequestTimeout<CancelImageUploadProcessMessage>(TimeSpan.FromMinutes(1));
            var response = new UploadImageMessage
            {
                InventoryId = message.InventoryId,
                SecondaryKey = secondaryKey,
                RetryCount = 3
            };
            Bus.Send(Bus.CurrentMessageContext.ReplyToAddress, Bus.CurrentMessageContext.Id, response);
        }
        public void Handle(UploadImageMessage message)
        {
            MarkAsComplete();
        }


        public void Timeout(CancelImageUploadProcessMessage state)
        {
            //read Saga instance and check current image upload process
            MarkAsComplete();
            // Queue message to update DCKiss status . DCInventory Status Update Message. 
        }
    }

Chetan,

In my previous comment I’ve outlined a possible reason why you don’t see the data. Have you investigated that route?

Sean,

I am able to debug the code. Handle(UploadImageMessage message) Still saga is not updated in Azure Storage.

Below is the snippet from App.Config

Let me see if I understand what you’ve got so far.

  1. The endpoint running the saga is not the endpoint handling UploadImageMessage - is that correct? If it’s not, could
    you please describe what your solution looks like? Ideally, if you could share a slimmed down version, it would be perfect.
  2. Could you list versions of the packages you’re using?

Thank you,
Sean

Hi Sean,

Me and Chetan having similar problem and both are working together. Please find below information.

  1. EndPoint used to run saga and UploadImageMessage are same.
  2. Package file
  package id="Microsoft.Data.Edm" version="5.7.0" targetFramework="net451" />
  package id="Microsoft.Data.OData" version="5.7.0" targetFramework="net451" />
  package id="Microsoft.Data.Services.Client" version="5.7.0" targetFramework="net451" />
  package id="Microsoft.WindowsAzure.ConfigurationManager" version="2.0.1.0" targetFramework="net451" />
  package id="Newtonsoft.Json" version="9.0.1" targetFramework="net451" />
  package id="NLog" version="3.2.1" targetFramework="net451" />
  package id="NLog.Extended" version="3.2.1" targetFramework="net451" />
  package id="NLog.Targets.NowGelf" version="1.0.5" targetFramework="net451" />
  package id="NServiceBus" version="5.2.8" targetFramework="net451" />
  package id="NServiceBus.Azure" version="6.2.7" targetFramework="net451" />
  package id="NServiceBus.Azure.Transports.WindowsAzureStorageQueues" version="6.2.1" targetFramework="net451" />
  package id="NServiceBus.Host" version="6.0.0" targetFramework="net451" />
  package id="NServiceBus.NLog" version="1.0.0" targetFramework="net451" />
  package id="System.Spatial" version="5.7.0" targetFramework="net451" />
  package id="WindowsAzure.Storage" version="4.3.0" targetFramework="net451" />

Similar post is available here:

Please check it

i was not able to upload sample code to this thread
Thanks,

Thank you Saravanakumar,

I suggest to consolidate the two issues into one. Could you please send your zip file to sean.feldman@particular.net as well as list the email address associated with your company? I’d like to open a support case to look into this.

Thank you,
Sean

Followed up with a sample code that implements a similar workflow.
Stripped down code works as expected.
SagaWithAzurePersistence.zip (7.2 KB)