This message represent Publisher or Subscriber?

msmq

(AK) #1
<?xml version="1.0"?>
<ArrayOfHeaderInfo xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<HeaderInfo>
		<Key>NServiceBus.ControlMessage</Key>
		<Value>True</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>NServiceBus.MessageIntent</Key>
		<Value>Subscribe</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>SubscriptionMessageType</Key>
		<Value>Messages.OrderPlaced, Messages, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>NServiceBus.ReplyToAddress</Key>
		<Value>Billing@server</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>NServiceBus.SubscriberAddress</Key>
		<Value>Billing@server</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>NServiceBus.SubscriberEndpoint</Key>
		<Value>Billing</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>NServiceBus.TimeSent</Key>
		<Value>2019-03-12 13:31:37:273789 Z</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>NServiceBus.Version</Key>
		<Value>7.1.6</Value>
	</HeaderInfo>
	<HeaderInfo>
		<Key>CorrId</Key>
		<Value />
	</HeaderInfo>
</ArrayOfHeaderInfo>

This message represent publisher or Subscriber? because I see subscribe in NServiceBus.MessageIntent


(Dennis van der Stelt) #2

This is a subscription request from a subscriber, created by NServiceBus, send towards a publisher. This message is created because MSMQ doesn’t nativly support pub/sub.


(AK) #3

But I suppose to create Publisher. So there is something went wrong. I am sharing my publisher code

EndPointConfig

private static IEnumerable<string> MessageNamespaces
        {
            get
            {
                return new[]
                {
                    typeof(PCounts).Namespace,
                    typeof(IInventoryCollectorMessage).Namespace
                };
            }
        }

        public void Customize(EndpointConfiguration configuration)
        {
            Contract.Requires<ArgumentNullException>(configuration != null, "configuration");

            Logger.Initialize(name => new Log4NetLogger(name));
            log4net.Config.XmlConfigurator.Configure();
            //LogManager.Use<Log4NetFactory>();
            ILog logger = LogManager.GetLogger(typeof(EndpointConfig));

            var builder = new ContainerBuilder();
            var container = builder.AddInstaller<InfrastructureInstaller>().Build();
            var settings = container.Resolve<IApplicationSettings>();
            EndPointInstance.Settings = settings;

            configuration.DefineEndpointName(settings.Endpoint);
            configuration.UseContainer<WindsorBuilder>(customizations => customizations.ExistingContainer(container));

            // Error Queue
            configuration.SendFailedMessagesTo(settings.ErrorQueue);
            configuration.Conventions().DefiningEventsAs(t => t.Namespace != null && MessageNamespaces.Contains(t.Namespace));
            configuration.UseSerialization<XmlSerializer>();
            var transport = configuration.UseTransport<MsmqTransport>().TransactionScopeOptions(
                timeout: TimeSpan.FromMinutes(settings.TransactionTimeoutMinutes));

            // First level retries
            configuration.Recoverability().Immediate(
                customizations: immediate =>
                {
                    immediate.NumberOfRetries(settings.MaxNumberOfFirstLevelRetries);
                });

            // Maximum concurrency level
            configuration.LimitMessageProcessingConcurrencyTo(settings.MaximumConcurrencyLevel);

            var recoverability = configuration.Recoverability();
            configuration.EnableInstallers();
            configuration.EnableFeature<EndPointInstance>();

            // Second level retries
            recoverability.Delayed(
                customizations: delayed =>
                {
                    delayed.NumberOfRetries(settings.NumberOfSecondLevelRetries);
                    delayed.TimeIncrease(TimeSpan.FromMinutes(settings.IncreaseDelayTime));
                });

            configuration.HandleIncomingIntegrationTypes(new[]
            {
                typeof(PCounts)
            });

            transport.Routing().RegisterPublisher(eventType: typeof(IInventoryCollectorMessage), publisherEndpoint: settings.InventorySubscriberQueue);
            configuration.UsePersistence<InMemoryPersistence>();
            configuration.PurgeOnStartup(false);

            logger.Info(string.Format(
               "{0} registered under {1}",
               typeof(HandleIncomingIntegrationTypesConfiguration).FullName,
               System.Security.Principal.WindowsIdentity.GetCurrent().Name));

            logger.Info(string.Format("Inventory Collector started under {0}", System.Security.Principal.WindowsIdentity.GetCurrent().Name));
        }
    }

Handler

public async Task Handle(PCounts message, IMessageHandlerContext context)
        {
                  await context.Publish(message).ConfigureAwait(false);
}

(Dennis van der Stelt) #4

The code you shared shows only the publisher code, but not the subscriber part? Or is it the same endpoint? Are you publishing events to the same endpoint? The code also contains settings that you didn’t include. Especially the name of your endpoint and this line are of interest:

transport.Routing().RegisterPublisher

I’m not sure what it is you’re expecting and what actually is happening.

To help you, we’ve built a great tutorial that explains exactly how NServiceBus and publishing events work. You can find the tutorial here.

Please check out that tutorial and if you have questions about the tutorial, don’t hesitate to ask them as well. If you can’t fix your current issue, please share a solution with the issue reproducible in code. That way I can have a look at what is going on and provide more details on how to set up your endpoints.


(AK) #5

This is the endpoint. Here I am trying to publish PCounts.xsd message.

public class EndpointConfig : IConfigureThisEndpoint
    {
      
private static IEnumerable<string> MessageNamespaces
        {
            get
            {
                return new[]
                {
                    typeof(PCounts).Namespace,
                    typeof(IInventoryCollectorMessage).Namespace
                };
            }
        }

        public void Customize(EndpointConfiguration configuration)
        {
            Contract.Requires<ArgumentNullException>(configuration != null, "configuration");

            Logger.Initialize(name => new Log4NetLogger(name));
            log4net.Config.XmlConfigurator.Configure();
            //LogManager.Use<Log4NetFactory>();
            ILog logger = LogManager.GetLogger(typeof("cao_invoice_collector"));

            var builder = new ContainerBuilder();
            var container = builder.AddInstaller<InfrastructureInstaller>().Build();
            var settings = container.Resolve<IApplicationSettings>();
            EndPointInstance.Settings = settings;

            configuration.DefineEndpointName(settings.Endpoint);
            configuration.UseContainer<WindsorBuilder>(customizations => customizations.ExistingContainer(container));

            // Error Queue
            configuration.SendFailedMessagesTo(settings.ErrorQueue);
            configuration.Conventions().DefiningEventsAs(t => t.Namespace != null && MessageNamespaces.Contains(t.Namespace));
            configuration.UseSerialization<XmlSerializer>();
            var transport = configuration.UseTransport<MsmqTransport>().TransactionScopeOptions(
                timeout: TimeSpan.FromMinutes(settings.TransactionTimeoutMinutes));

            // First level retries
            configuration.Recoverability().Immediate(
                customizations: immediate =>
                {
                    immediate.NumberOfRetries(3);
                });

            // Maximum concurrency level
            configuration.LimitMessageProcessingConcurrencyTo(4);

            var recoverability = configuration.Recoverability();
            configuration.EnableInstallers();
            configuration.EnableFeature<EndPointInstance>();

            // Second level retries
            recoverability.Delayed(
                customizations: delayed =>
                {
                    delayed.NumberOfRetries(4);
                    delayed.TimeIncrease(5));
                });

            configuration.HandleIncomingIntegrationTypes(new[]
            {
                typeof(PCounts)
            });

                configuration.DisableFeature<AutoSubscribe>();
            configuration.UsePersistence<NHibernatePersistence, StorageType.Timeouts>(); // Use           NHibernate for Timeout persistence.
            configuration.UsePersistence<NHibernatePersistence, StorageType.Subscriptions>();
            configuration.PurgeOnStartup(false);

            logger.Info(string.Format(
               "{0} registered under {1}",
               typeof(HandleIncomingIntegrationTypesConfiguration).FullName,
               System.Security.Principal.WindowsIdentity.GetCurrent().Name));

            logger.Info(string.Format("Inventory Collector started under {0}", System.Security.Principal.WindowsIdentity.GetCurrent().Name));
        }
    }
public async Task Handle(PCounts message, IMessageHandlerContext context)
        {
                  await context.Publish(message).ConfigureAwait(false);
}

(Dennis van der Stelt) #6

Did you manage to check out the tutorial already?


(Dennis van der Stelt) #7

I hope you’re successfully working with the tutorial and are able to figure out what publish/subscribe in NServiceBus is all about.

In the meantime, let me try to give some guidance:

  1. I see you’re using the NServiceBus.Host. I’m not sure if this code is somewhat older, but you should try to use self-hosting, as the host will be deprecated in the near future.
  2. As far as I know, this should not even compile: typeof("cao_invoice_collector"). Try to use the class you’re currently in, like LogManager.GetLogger(typeof(EndpointConfig));
  3. I have no idea what EndPointInstance does? The name doesn’t imply anything. The code implies there are settings involved? You’re hiding away quite a bit though. Is this the ideal way to configure an NServiceBus endpoint? Especially since there’s already so much code involved in configuring it?
  4. configuration.DefineEndpointName(settings.Endpoint); also hides a very important fact. Try to keep it visible in code or at the minimum, pass the name as a parameter if this is generic code. It would rather have a generic method like public EndpointConfiguration GetEndpointConfiguration(string endpointName)
  5. SendFailedMessagesTo configures an error queue from configuration settings. Don’t you use ServiceControl and a single error queue for all endpoints? In most cases that’s preferable.
  6. When configuring conventions, is the MessageNamespaces really the list for all events, or for all messages? Are there no commands?
  7. Although possible to tune LimitMessageProcessingConcurrencyTo, why is it 4 here?
  8. What does HandleIncomingIntegrationTypes do?
  9. configuration.DisableFeature<AutoSubscribe>(); This means you’re not subscribing to any publishers and I don’t see code to manually subscribe to certain events? Could be you’re explicitly not subscribing to anything, but something might be off here. And since you’re investigating pub/sub, this could be the cause. However, I still am unaware of what the actual problem is you’re trying to solve.