Hi,
We are finding that by default we are unable to process multiple messages for multiple sagas at the same time. Specifically we have two sagas, and each once to process a message however one saga is blocking the other.
Reproduction:
Take the sample from Sql Server Transport and SQL Persistence and make changes such that the saga just keeps sending messages to itself, and when it processes that message the handler takes some time. The handler will also print how many are in the handler at a time.
To do that simply replace the OrderLifecycleSaga.cs
to be:
public class OrderLifecycleSaga(ILogger<OrderLifecycleSaga> logger) :
Saga<OrderLifecycleSaga.SagaData>,
IAmStartedByMessages<OrderSubmitted>,
IHandleTimeouts<OrderTimeout>,
IHandleMessages<LetsDoIt>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<OrderSubmitted>(msg => msg.OrderId)
.ToMessage<LetsDoIt>(msg => msg.OrderId);
}
public async Task Handle(OrderSubmitted message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
logger.LogInformation("Started!");
await context.Publish(new LetsDoIt(message.OrderId));
}
public Task Timeout(OrderTimeout state, IMessageHandlerContext context)
{
logger.LogInformation("Got timeout");
return Task.CompletedTask;
}
public class SagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
}
private static int countInHandler = 0;
public async Task Handle(LetsDoIt message, IMessageHandlerContext context)
{
Interlocked.Increment(ref countInHandler);
try
{
logger.LogInformation("{GUID} is in handler in total {CountInHandler} are in Handler", message.OrderId, countInHandler);
for (int i = 0; i < 3; i++)
{
await Task.Delay(3000);
logger.LogInformation("{GUID} is in handler in total {CountInHandler} are in Handler", message.OrderId, countInHandler);
}
logger.LogInformation("{GUID} is in handler in total {CountInHandler} are in Handler", message.OrderId, countInHandler);
await context.Publish(new LetsDoIt(message.OrderId));
}
finally
{
Interlocked.Decrement(ref countInHandler);
}
}
}
public class LetsDoIt(Guid orderId) : IEvent
{
public Guid OrderId { get; set; } = orderId;
}
Now start the sender and reciever.
Send two messages from the sender.
We will see that at any one time only one is in the handler, and the other saga is blocked on the DB.
Looking the the DB page locks will show up on the index.
Fix
To fix this we do as what was suggested here, which is to change the index on the OrderId to include all fields:
DROP INDEX [Index_Correlation_OrderId] ON [receiver].[OrderLifecycleSaga]
GO
CREATE UNIQUE NONCLUSTERED INDEX [Index_Correlation_OrderId] ON [receiver].[OrderLifecycleSaga]
(
[Correlation_OrderId] ASC
)
include ([Id]
,[Metadata]
,[Data]
,[PersistenceVersion]
,[SagaTypeVersion]
,[Concurrency])
WHERE ([Correlation_OrderId] IS NOT NULL)
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
GO
After running that the number within the handler now jumps to 2.
Other notes
Change the PK to be OrderId
We also tried to swap what the PK is, to make OrderId to be the PK, however that immediately resulted in deadlocks!
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[receiver].[OrderLifecycleSaga]') AND type in (N'U'))
DROP TABLE [receiver].[OrderLifecycleSaga]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [receiver].[OrderLifecycleSaga](
[Id] [uniqueidentifier] NOT NULL,
[Metadata] [nvarchar](max) NOT NULL,
[Data] [nvarchar](max) NOT NULL,
[PersistenceVersion] [varchar](23) NOT NULL,
[SagaTypeVersion] [varchar](23) NOT NULL,
[Concurrency] [int] NOT NULL,
[Correlation_OrderId] [uniqueidentifier] NOT NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
ALTER TABLE [receiver].[OrderLifecycleSaga] ADD PRIMARY KEY CLUSTERED
(
[Correlation_OrderId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
GO
CREATE UNIQUE NONCLUSTERED INDEX [Index_Id] ON [receiver].[OrderLifecycleSaga]
(
[Id] ASC
)
WHERE ([Id] IS NOT NULL)
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
GO
Use rowlock
We also tried to use rowlock
with updlock
but that did not appear to help.
Expected
By default NSB should be able to process multiple messages for multiple sagas at the same time.