AWS SES integration with native messages

Hello, we are using an NServiceBus endpoint to send email via AWS SES. We have responses coming from SES and landing in a generic SQS queue. We’d like to have those responses land in our NServiceBus queue. It seems that native message handling can be done but it requires the MessageTypeFullName attribute. There doesn’t appear to be support for injecting this attribute on the AWS side. I’m presently looking at “message mutators”. This might work but requires introspecting every incoming message (and adding the attribute somehow).

Is there anyone who has done this kind of thing and can give me some direction?

Thanks.

Hi @dastultz

I think there are multiple options possible to make such a scenario work, I’m trying to roughly describe those that are coming to my mind immediately:

  • Adding the necessary metadata/headers as part of the message pipeline. This can be done via a mutator or a plain (early) pipeline behavior. As you already stated, this is code that will be executed for every incoming message but I’d assume that the additional code being run might not be that problematic unless your endpoint is handling a lot of other messages. Overall, this might be the easiest approach.
  • A custom AWS Lambda function could pick up the messages from the special queue, modify as needed and forward it to the handling endpoint’s input queue. The main benefit is that the integration is only executed for the messages that need it and is separately deployed from your endpoint but it will be more infrastructure to maintain and deploy.
  • satellites might be an alternative to a AWS Lambda function that can consume a specific queue and process the message. The satellite pipeline API is fairly advanced and low level so it’s definitely more effort compared to behaviors/mutators but doesn’t require additional infrastructure like a custom Lambda function.

Btw. we do have some documentation and samples about native integration with SQS messages here in case you haven’t seen that already: AmazonSQS native integration • Amazon SQS Transport • Particular Docs and AmazonSQS transport native integration sample • Amazon SQS Transport Samples • Particular Docs. Maybe those links can also be useful to you.

Hi Tim, the first bullet point I think is the thing I really want to do. As per the documentation you linked, “a MessageTypeFullName message attribute must be present.” It is not present and I’d like to do whatever transformation is required upstream of the deserialization including adding the attribute.

I presently have mutators for IMutateIncomingMessages and IMutateIncomingTransportMessages as well as a behavior for IIncomingPhysicalMessageContext. These implementations take no action other than to print to the console. When I send a message to the bus via my application all three print statements are seen indicating each component is invoked.

When I send a message to the queue with the AWS CLI (imitating an SNS notification) the processing throws an exception. If I do not include any message attributes I get this:

[2022/09/09 08:48:38 Warning] [NServiceBus.Transport.SQS.MessagePump] Treating message with a664d7ae-594f-45f1-9e0a-de870bd2dcb5 as a poison message. Moving to error queue. 
System.ArgumentNullException: Value cannot be null. (Parameter 's')
   at System.Convert.FromBase64String(String s)
   at NServiceBus.Transport.SQS.Extensions.MessageExtensions.RetrieveBody(TransportMessage transportMessage, IAmazonS3 s3Client, TransportConfiguration transportConfiguration, CancellationToken cancellationToken) in /_/src/NServiceBus.Transport.SQS/Extensions/MessageExtensions.cs:line 21
   at NServiceBus.Transport.SQS.InputQueuePump.ProcessMessage(Message receivedMessage, SemaphoreSlim processingSemaphoreSlim, CancellationToken token) in /_/src/NServiceBus.Transport.SQS/InputQueuePump.cs:line 202

If I include MessageTypeFullName (that does not yet resolve to an existing class name), I get this:

[2022/09/09 08:37:29 Warning] [NServiceBus.Transport.SQS.MessagePump] Treating message with bad2ae2d-e442-44a8-987e-e5477aee6a65 as a poison message. Moving to error queue. 
System.FormatException: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
   at System.Convert.FromBase64_ComputeResultLength(Char* inputPtr, Int32 inputLength)
   at System.Convert.FromBase64CharPtr(Char* inputPtr, Int32 inputLength)
   at System.Convert.FromBase64String(String s)
   at NServiceBus.Transport.SQS.Extensions.MessageExtensions.RetrieveBody(TransportMessage transportMessage, IAmazonS3 s3Client, TransportConfiguration transportConfiguration, CancellationToken cancellationToken) in /_/src/NServiceBus.Transport.SQS/Extensions/MessageExtensions.cs:line 21
   at NServiceBus.Transport.SQS.InputQueuePump.ProcessMessage(Message receivedMessage, SemaphoreSlim processingSemaphoreSlim, CancellationToken token) in /_/src/NServiceBus.Transport.SQS/InputQueuePump.cs:line 202

So both paths seem to rely on some part of the incoming message being base 64 encoded. I’d like to take any arbitrary incoming message and transform/amend it for NServiceBus to deserialize into an object instance (although my specific current need is for SNS notifications).

Do you have any further direction to suggest? Thank you.
/Daryl

I discovered this other post that appears to describe the problem I’m having. The post suggests the problem is in NSB 6 but not 7, but then that idea may be retracted at the end. I’m using NSB 7 with NServiceBus.AmazonSQS 5.

Is this a hard limitation? This isn’t really “native SQS integration” if all messages need to be in a certain format.

We assume the body is a Base64 encoded string. Which sounds off considering your use case. I raised an issue to keep track of that: SQS Transport limits native integration options by assuming message body is a Base64 encoded string · Issue #1579 · Particular/NServiceBus.AmazonSQS · GitHub.

As this comment already points out, there seems to be no applicable workaround on the endpoint level to handle native messages that aren’t base64 encoded at the moment.

This is very unfortunate and I hope we’re able to support better native integration with the SQS transport but it seems the only viable workaround at the moment would be a custom AWS Lambda function that might transform the message into the required NServiceBus message format and does the base64 encoding.