Summary: Given that endpoint name is different than its actual queue name, can we use just endpoint name to register endpoint with Bridge ?
Scenario:
I am trying to use bridge for migrating our applications queues from RabbitMQ to Azure ServiceBus
We want to have capability for developers to test/run their code in without stepping on each others toes. So in our development ASB environment, we want to add identifier with each endpoints queue name so they are unique to each developer.
For example ,
Consider EndpointA is sending message to EndpointB,
When Dev1 runs the EndpointA and EndpointB for local testing , in azure service bus their queue should show up as dev1_EndpointA and dev1_EndpointB
When Dev2 runs the EndpointA and EndpointB for local testing , in azure service bus their queue should show up as dev2_EndpointA and dev2_EndpointB
Desired result: At runtime detect that since dev1 is running app in his local environment, messages from dev1_endpointA should be routed to dev1_endpointB.
How to achieve this with Bridge?
What I have tried : Updated a sample project to create bridge. In each endpoint, while initializing EndpointConfiguration() I am giving a name thats different than the one Im adding in OverrideLocalAddress().
I am currently running into this exception,
fail: MessageShovelErrorHandlingPolicy[0] Message shovel operation failed, message will be moved to bridge.error System.Exception: Failed to shovel message for endpoint RightReceiver with id 646665ef-f188-4c96-9ce2-5e70b1119f08 from learning to right-side —> System.Exception: No target address mapping could be found for source address: Samples.Bridge.LeftSender. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: LeftSender at EndpointRegistry.TranslateToTargetAddress(String sourceAddress) in //src/NServiceBus.MessagingBridge/EndpointRegistry.cs:line 67 at MessageShovel.d__1.MoveNext() in //src/NServiceBus.MessagingBridge/MessageShovel.cs:line 58 — End of inner exception stack trace — at MessageShovel.d__1.MoveNext() in //src/NServiceBus.MessagingBridge/MessageShovel.cs:line 74 — End of stack trace from previous location where exception was thrown — at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at NServiceBus.LearningTransportMessagePump.d__23.MoveNext() in //src/NServiceBus.Core/Transports/Learning/LearningTransportMessagePump.cs:line 340
I have updated this sample code like shown below, major changes are endpoint name is different and i am using OverrideLocalAddress to mention queue name
Bridge code
static async Task Main()
{
Console.Title = "Samples.Bridge";
var leftReceiverEndptName = "LeftReceiver";
//var leftReceiverQName = "Samples.Bridge.LeftReceiver";
var leftSenderEndptName = "LeftSender";
//var leftSenderQName = "Samples.Bridge.LeftSender";
var rightReceiverEndptName = "RightReceiver";
//var rightReceiverQName = "Samples.Bridge.RightReceiver";
await Host.CreateDefaultBuilder()
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddSimpleConsole(options =>
{
options.IncludeScopes = false;
options.SingleLine = true;
options.TimestampFormat = "hh:mm:ss ";
});
})
.UseNServiceBusBridge((ctx, bridgeConfiguration) =>
{
#region endpoint-adding-simple
var learningLeft = new BridgeTransport(new LearningTransport());
learningLeft.HasEndpoint(leftSenderEndptName);
#endregion
var learningTransport = new LearningTransport
{
// Set storage directory and add the character '2' to simulate a different transport.
StorageDirectory = $"{LearningTransportInfrastructure.FindStoragePath()}2"
};
var learningRight = new BridgeTransport(learningTransport)
{
// A different name is required if transports are used twice.
Name = "right-side"
};
#region endpoint-adding-register-publisher-by-string
var rightReceiver = new BridgeEndpoint(rightReceiverEndptName);
rightReceiver.RegisterPublisher("OrderReceived", rightReceiverEndptName);
#endregion
learningRight.HasEndpoint(rightReceiver);
#region add-transports-to-bridge
bridgeConfiguration.AddTransport(learningLeft);
bridgeConfiguration.AddTransport(learningRight);
#endregion
})
.Build()
.RunAsync();
}
LeftReceiver
static async Task Main()
{
var endptName = "LeftReceiver";
var qName = "Samples.Bridge.LeftReceiver";
Console.Title = endptName;
var endpointConfiguration = new EndpointConfiguration(endptName);
endpointConfiguration.OverrideLocalAddress(qName);
endpointConfiguration.UsePersistence<LearningPersistence>();
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.UseTransport(new LearningTransport());
endpointConfiguration.Conventions().DefiningMessagesAs(t => t.Name == "OrderResponse");
endpointConfiguration.Conventions().DefiningEventsAs(t => t.Name == "OrderReceived");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
var endpointInstance = await Endpoint.Start(endpointConfiguration);
Console.WriteLine("Press any key to exit");
Console.ReadKey();
await endpointInstance.Stop();
}
LeftSender
static async Task Main()
{
var endptName = "LeftSender";
var qName = "Samples.Bridge.LeftSender";
Console.Title = endptName;
var rightReceiverEndptName = "RightReceiver";
var endpointConfiguration = new EndpointConfiguration(endptName);
endpointConfiguration.UsePersistence<LearningPersistence>();
endpointConfiguration.OverrideLocalAddress(qName);
endpointConfiguration.Conventions().DefiningCommandsAs(t => t.Name == "PlaceOrder");
endpointConfiguration.Conventions().DefiningMessagesAs(t => t.Name == "OrderResponse");
endpointConfiguration.Conventions().DefiningEventsAs(t => t.Name == "OrderReceived");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var routing = endpointConfiguration.UseTransport(new LearningTransport());
routing.RouteToEndpoint(typeof(PlaceOrder), rightReceiverEndptName);
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
var endpointInstance = await Endpoint.Start(endpointConfiguration);
await Start(endpointInstance);
await endpointInstance.Stop();
}
static async Task Start(IEndpointInstance endpointInstance)
{
Console.WriteLine("Press '1' to send the PlaceOrder command");
Console.WriteLine("Press '2' to publish the OrderReceived event");
Console.WriteLine("Press 'esc' other key to exit");
while (true)
{
var key = Console.ReadKey();
Console.WriteLine();
var orderId = Guid.NewGuid();
switch (key.Key)
{
case ConsoleKey.D1:
case ConsoleKey.NumPad1:
var placeOrder = new PlaceOrder
{
OrderId = orderId
};
await endpointInstance.Send(placeOrder);
Console.WriteLine($"Send PlaceOrder Command with Id {orderId}");
break;
case ConsoleKey.D2:
case ConsoleKey.NumPad2:
var orderReceived = new OrderReceived
{
OrderId = orderId
};
await endpointInstance.Publish(orderReceived);
Console.WriteLine($"Published OrderReceived Event with Id {orderId}.");
break;
case ConsoleKey.Escape:
return;
}
}
}
RightReceiver
static async Task Main()
{
var endptName = "RightReceiver";
var qName = "Samples.Bridge.RightReceiver";
Console.Title = endptName;
var endpointConfiguration = new EndpointConfiguration(endptName);
endpointConfiguration.UsePersistence<LearningPersistence>();
endpointConfiguration.OverrideLocalAddress(qName);
endpointConfiguration.Conventions().DefiningCommandsAs(t => t.Name == "PlaceOrder");
endpointConfiguration.Conventions().DefiningMessagesAs(t => t.Name == "OrderResponse");
endpointConfiguration.Conventions().DefiningEventsAs(t => t.Name == "OrderReceived");
#region alternative-learning-transport
var learningTransportDefinition = new LearningTransport
{
// Set storage directory and add the character '2' to simulate a different transport.
StorageDirectory = $"{LearningTransportInfrastructure.FindStoragePath()}2"
};
endpointConfiguration.UseTransport(learningTransportDefinition);
#endregion
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
var endpointInstance = await Endpoint.Start(endpointConfiguration);
Console.WriteLine("Press any key to exit");
Console.ReadKey();
await endpointInstance.Stop();
}