diff --git a/docs/src/main/asciidoc/sns.adoc b/docs/src/main/asciidoc/sns.adoc index 7147b88aa..325c60a92 100644 --- a/docs/src/main/asciidoc/sns.adoc +++ b/docs/src/main/asciidoc/sns.adoc @@ -218,6 +218,45 @@ class NotificationService { } ---- +==== SNS Async Template + +The starter automatically configures and registers a `SnsAsyncTemplate` bean when `SnsAsyncClient` bean is registered by user, providing non-blocking SNS operations. +`SnsAsyncTemplate` returns `CompletableFuture` for all operations, making it suitable for high-throughput async applications. + +It supports the same payload types as `SnsTemplate` and uses the same Spring `MessageConverter` hirarchy inside of `DefaultSnsPublishMessageConverter` to convert payloads. + +[source,java] +---- +import io.awspring.cloud.sns.core.async.SnsAsyncTemplate; +import io.awspring.cloud.sns.core.async.PublishMessageResult; + +import java.util.concurrent.CompletableFuture; + +class NotificationService { + private final SnsAsyncTemplate snsAsyncTemplate; + + NotificationService(SnsAsyncTemplate snsAsyncTemplate) { + this.snsAsyncTemplate = snsAsyncTemplate; + } + + CompletableFuture> sendNotification() { + return snsAsyncTemplate.convertAndSend("topic-arn", "payload"); + } +} +---- + +To customize message conversion, provide a custom `SnsPublishMessageConverter` bean: + +[source,java] +---- +@Bean +public SnsPublishMessageConverter customConverter() { + return new MyCustomConverter(); +} +---- + +Autoconfiguration will pick it up and configure `SnsAsyncTemplate` automatically with it. + === Using SNS Client To have access to all lower level SNS operations, we recommend using `SnsClient` from AWS SDK. `SnsClient` bean is autoconfigured by `SnsAutoConfiguration`. diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java index 5a2badf57..6401407d4 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java @@ -30,6 +30,10 @@ import io.awspring.cloud.sns.core.SnsOperations; import io.awspring.cloud.sns.core.SnsTemplate; import io.awspring.cloud.sns.core.TopicArnResolver; +import io.awspring.cloud.sns.core.async.DefaultSnsPublishMessageConverter; +import io.awspring.cloud.sns.core.async.SnsAsyncOperations; +import io.awspring.cloud.sns.core.async.SnsAsyncTemplate; +import io.awspring.cloud.sns.core.async.SnsPublishMessageConverter; import io.awspring.cloud.sns.core.batch.SnsBatchOperations; import io.awspring.cloud.sns.core.batch.SnsBatchTemplate; import io.awspring.cloud.sns.core.batch.converter.DefaultSnsMessageConverter; @@ -38,12 +42,15 @@ import io.awspring.cloud.sns.core.batch.executor.SequentialBatchExecutionStrategy; import io.awspring.cloud.sns.sms.SnsSmsOperations; import io.awspring.cloud.sns.sms.SnsSmsTemplate; + import java.util.List; import java.util.Optional; + import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; @@ -56,6 +63,7 @@ import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.web.method.support.HandlerMethodArgumentResolver; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; +import software.amazon.awssdk.services.sns.SnsAsyncClient; import software.amazon.awssdk.services.sns.SnsClient; import tools.jackson.databind.json.JsonMapper; @@ -71,22 +79,22 @@ * @author Mariusz Sondecki */ @AutoConfiguration -@ConditionalOnClass({ SnsClient.class, SnsTemplate.class }) -@EnableConfigurationProperties({ SnsProperties.class }) -@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnClass({SnsClient.class, SnsTemplate.class}) +@EnableConfigurationProperties({SnsProperties.class}) +@AutoConfigureAfter({CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class}) @ConditionalOnProperty(name = "spring.cloud.aws.sns.enabled", havingValue = "true", matchIfMissing = true) public class SnsAutoConfiguration { @ConditionalOnMissingBean @Bean public SnsClient snsClient(SnsProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer, - ObjectProvider connectionDetails, - ObjectProvider snsClientCustomizers, - ObjectProvider awsSyncClientCustomizers) { + ObjectProvider connectionDetails, + ObjectProvider snsClientCustomizers, + ObjectProvider awsSyncClientCustomizers) { return awsClientBuilderConfigurer - .configureSyncClient(SnsClient.builder(), properties, connectionDetails.getIfAvailable(), - snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream()) - .build(); + .configureSyncClient(SnsClient.builder(), properties, connectionDetails.getIfAvailable(), + snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream()) + .build(); } @ConditionalOnMissingBean(SnsSmsOperations.class) @@ -95,18 +103,41 @@ public SnsSmsTemplate snsSmsTemplate(SnsClient snsClient) { return new SnsSmsTemplate(snsClient); } + @ConditionalOnClass({SnsAsyncClient.class, SnsAsyncTemplate.class}) + @Configuration + static class SnsAsyncTemplateConfiguration { + + @Bean + @ConditionalOnMissingBean(SnsAsyncOperations.class) + @ConditionalOnBean(SnsAsyncClient.class) + public SnsAsyncTemplate snsAsyncTemplate(SnsAsyncClient snsAsyncClient, Optional topicArnResolver, SnsPublishMessageConverter snsPublishMessageConverter) { + return topicArnResolver.map(it -> new SnsAsyncTemplate(snsAsyncClient, it, snsPublishMessageConverter)) + .orElseGet(() -> new SnsAsyncTemplate(snsAsyncClient, snsPublishMessageConverter)); + } + + @Bean + @ConditionalOnMissingBean(SnsPublishMessageConverter.class) + @ConditionalOnBean(SnsAsyncClient.class) + public SnsPublishMessageConverter snsPublishMessageConverter(Optional jsonMapper) { + JacksonJsonMessageConverter converter = new JacksonJsonMessageConverter( + jsonMapper.orElseGet(JsonMapper::new)); + converter.setSerializedPayloadClass(String.class); + return new DefaultSnsPublishMessageConverter(converter); + } + } + @ConditionalOnClass(name = "tools.jackson.databind.json.JsonMapper") @Configuration static class SnsConfiguration { @ConditionalOnMissingBean(SnsOperations.class) @Bean public SnsTemplate snsTemplate(SnsClient snsClient, Optional jsonMapper, - Optional topicArnResolver, ObjectProvider interceptors) { + Optional topicArnResolver, ObjectProvider interceptors) { JacksonJsonMessageConverter converter = new JacksonJsonMessageConverter( - jsonMapper.orElseGet(JsonMapper::new)); + jsonMapper.orElseGet(JsonMapper::new)); converter.setSerializedPayloadClass(String.class); SnsTemplate snsTemplate = topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter)) - .orElseGet(() -> new SnsTemplate(snsClient, converter)); + .orElseGet(() -> new SnsTemplate(snsClient, converter)); interceptors.forEach(snsTemplate::addChannelInterceptor); return snsTemplate; @@ -120,12 +151,12 @@ static class LegacyJackson2Configuration { @ConditionalOnMissingBean(SnsOperations.class) @Bean public SnsTemplate snsTemplate(SnsClient snsClient, Optional objectMapper, - Optional topicArnResolver, ObjectProvider interceptors) { + Optional topicArnResolver, ObjectProvider interceptors) { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setSerializedPayloadClass(String.class); objectMapper.ifPresent(converter::setObjectMapper); SnsTemplate snsTemplate = topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter)) - .orElseGet(() -> new SnsTemplate(snsClient, converter)); + .orElseGet(() -> new SnsTemplate(snsClient, converter)); interceptors.forEach(snsTemplate::addChannelInterceptor); return snsTemplate; @@ -173,8 +204,7 @@ public void addArgumentResolvers(List resolvers) resolvers.add(getNotificationHandlerMethodArgumentResolver(snsClient)); } }; - } - else if (JacksonPresent.isJackson2Present()) { + } else if (JacksonPresent.isJackson2Present()) { return new WebMvcConfigurer() { @Override public void addArgumentResolvers(List resolvers) { @@ -183,7 +213,7 @@ public void addArgumentResolvers(List resolvers) }; } throw new IllegalStateException( - "SecretsManagerPropertySource requires a Jackson 2 or Jackson 3 library on the classpath"); + "SnsWebMvc integration requires a Jackson 2 or Jackson 3 library on the classpath"); } } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java index c030db26d..b9eea255d 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java @@ -28,11 +28,14 @@ import io.awspring.cloud.sns.core.batch.SnsBatchTemplate; import io.awspring.cloud.sns.core.batch.converter.SnsMessageConverter; import io.awspring.cloud.sns.core.batch.executor.BatchExecutionStrategy; +import io.awspring.cloud.sns.core.async.SnsAsyncTemplate; import io.awspring.cloud.sns.sms.SnsSmsOperations; import io.awspring.cloud.sns.sms.SnsSmsTemplate; import java.net.URI; + +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -43,6 +46,7 @@ import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.sns.SnsAsyncClient; import software.amazon.awssdk.services.sns.SnsClient; /** @@ -131,6 +135,37 @@ void customChannelInterceptorCanBeConfigured() { .run(context -> assertThat(context).hasSingleBean(CustomChannelInterceptor.class)); } + @Nested + class SnsAsyncTemplateTests { + + @Test + void snsAsyncTemplateIsNotCreatedWhenSnsAsyncClientIsNotPresent() { + contextRunner.run(context -> { + assertThat(context).hasSingleBean(SnsClient.class); + assertThat(context).hasSingleBean(SnsTemplate.class); + assertThat(context).doesNotHaveBean(SnsAsyncTemplate.class); + }); + } + + @Test + void snsAsyncTemplateIsCreatedWhenSnsAsyncClientIsPresent() { + contextRunner.withUserConfiguration(SnsAsyncClientConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SnsClient.class); + assertThat(context).hasSingleBean(SnsTemplate.class); + assertThat(context).hasSingleBean(SnsAsyncClient.class); + assertThat(context).hasSingleBean(SnsAsyncTemplate.class); + }); + } + + @Test + void bothAsyncTemplatesAndOperationsAreInjectable() { + contextRunner.withUserConfiguration(SnsAsyncClientConfiguration.class, InjectingAsyncTemplatesConfiguration.class).run(context -> { + assertThat(context.isRunning()).isTrue(); + assertThat(context).hasSingleBean(SnsAsyncTemplate.class); + }); + } + } + @Configuration(proxyBeanMethods = false) static class CustomTopicArnResolverConfiguration { @@ -198,4 +233,22 @@ ChannelInterceptor customChannelInterceptor() { static class CustomChannelInterceptor implements ChannelInterceptor { } + + @Configuration(proxyBeanMethods = false) + static class SnsAsyncClientConfiguration { + + @Bean + SnsAsyncClient snsAsyncClient() { + return mock(SnsAsyncClient.class); + } + } + + @Configuration(proxyBeanMethods = false) + static class InjectingAsyncTemplatesConfiguration { + @Bean + ApplicationRunner asyncRunner1(SnsAsyncTemplate snsAsyncTemplate) { + return args -> { + }; + } + } } diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java index 0e3000556..e471acc2f 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java @@ -40,8 +40,6 @@ */ public class TopicMessageChannel extends AbstractMessageChannel { - public JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create(); - private final SnsClient snsClient; private final Arn topicArn; diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java new file mode 100644 index 000000000..b36eec6ee --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java @@ -0,0 +1,100 @@ +package io.awspring.cloud.sns.core.async; + +import io.awspring.cloud.sns.core.SnsHeaderConverterUtil; +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.converter.StringMessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.sns.model.MessageAttributeValue; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; + +/** + * Default implementation of {@link SnsPublishMessageConverter}. + * + * @author Matej Nedic + * @since 4.1.0 + */ +public class DefaultSnsPublishMessageConverter implements SnsPublishMessageConverter { + + private final MessageConverter messageConverter; + + public DefaultSnsPublishMessageConverter() { + this(null); + } + + public DefaultSnsPublishMessageConverter(@Nullable MessageConverter messageConverter) { + this.messageConverter = initMessageConverter(messageConverter); + } + + @Override + public PublishRequestMessagePair convert(Message originalMessage) { + Assert.notNull(originalMessage, "message cannot be null"); + + PublishRequest.Builder publishRequest = PublishRequest.builder(); + populateHeaders(publishRequest, originalMessage); + + Message convertedMessage = messageConverter.toMessage( + originalMessage.getPayload(), + originalMessage.getHeaders() + ); + publishRequest.message(convertedMessage.getPayload().toString()); + + return new PublishRequestMessagePair<>(publishRequest.build(), originalMessage); + } + + @Override + public PublishRequestMessagePair convert(T payload, Map headers) { + Assert.notNull(payload, "payload cannot be null"); + Assert.notNull(headers, "headers cannot be null"); + + Message originalMessage = MessageBuilder + .withPayload(payload) + .copyHeaders(headers) + .build(); + + return convert(originalMessage); + } + + private void populateHeaders(PublishRequest.Builder publishRequest, Message message) { + Map messageAttributes = SnsHeaderConverterUtil.toSnsMessageAttributes(message); + if (!messageAttributes.isEmpty()) { + publishRequest.messageAttributes(messageAttributes); + } + + Optional.ofNullable(message.getHeaders().get(NOTIFICATION_SUBJECT_HEADER, String.class)) + .ifPresent(publishRequest::subject); + + Optional.ofNullable(message.getHeaders().get(MESSAGE_GROUP_ID_HEADER, String.class)) + .ifPresent(publishRequest::messageGroupId); + + Optional.ofNullable(message.getHeaders().get(MESSAGE_DEDUPLICATION_ID_HEADER, String.class)) + .ifPresent(publishRequest::messageDeduplicationId); + } + + + private static CompositeMessageConverter initMessageConverter(@Nullable MessageConverter messageConverter) { + List converters = new ArrayList<>(); + + StringMessageConverter stringMessageConverter = new StringMessageConverter(); + stringMessageConverter.setSerializedPayloadClass(String.class); + converters.add(stringMessageConverter); + + if (messageConverter != null) { + converters.add(messageConverter); + } + + return new CompositeMessageConverter(converters); + } +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishMessageResult.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishMessageResult.java new file mode 100644 index 000000000..f48e83644 --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishMessageResult.java @@ -0,0 +1,21 @@ +package io.awspring.cloud.sns.core.async; + +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; + +/** + * Result of an SNS operation. + * + * @param message the original message that was sent + * @param messageId the SNS message ID returned by AWS + * @param sequenceNumber the sequence number for FIFO topics (null for standard topics) + * @param the message payload type + * @author Matej Nedic + * @since 4.1.0 + */ +public record PublishMessageResult( + Message message, + String messageId, + @Nullable String sequenceNumber +) { +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java new file mode 100644 index 000000000..7eb24ea61 --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java @@ -0,0 +1,19 @@ +package io.awspring.cloud.sns.core.async; + +import org.springframework.messaging.Message; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +/** + * Pair containing a PublishRequest and the original Spring Message. + * + * @param publishRequest the AWS SNS publish request + * @param originalMessage the original Spring message + * @param the message payload type + * @author Matej Nedic + * @since 4.1.0 + */ +record PublishRequestMessagePair( + PublishRequest publishRequest, + Message originalMessage +) { +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java new file mode 100644 index 000000000..9945a284a --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java @@ -0,0 +1,101 @@ +package io.awspring.cloud.sns.core.async; + +import io.awspring.cloud.sns.core.SnsNotification; +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.core.MessagePostProcessor; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * High level asynchronous SNS operations. + * + * @author Matej Nedic + * @since 4.1.0 + */ +public interface SnsAsyncOperations { + + /** + * Sends a message to a destination. + * + * @param destination the destination name + * @param message the message to send + * @param the message payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> send(String destination, Message message) ; + + /** + * Converts and sends a payload to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload) ; + + /** + * Converts and sends a payload with headers to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param headers the headers to include + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers); + + /** + * Converts and sends a payload with a post processor to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param postProcessor the post processor to apply + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor); + + /** + * Converts and sends a payload with headers and a post processor to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param headers the headers to include + * @param postProcessor the post processor to apply + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, + @Nullable MessagePostProcessor postProcessor); + + /** + * Sends a notification with a message and subject to a destination. + * + * @param destinationName the destination name + * @param message the message to send + * @param subject the subject (can be null) + * @return a CompletableFuture with the result + */ + CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject); + + /** + * Sends a notification to a topic. + * + * @param topic the topic name + * @param notification the notification to send + * @param the notification payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> sendNotification(String topic, SnsNotification notification); + + /** + * Checks if a topic with the given ARN exists. + * + * @param topicArn the ARN of the topic + * @return a CompletableFuture with true if the topic exists, false otherwise + */ + CompletableFuture topicExists(String topicArn); +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java new file mode 100644 index 000000000..a0e984217 --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java @@ -0,0 +1,158 @@ +package io.awspring.cloud.sns.core.async; + +import io.awspring.cloud.sns.core.CachingTopicArnResolver; +import io.awspring.cloud.sns.core.SnsAsyncTopicArnResolver; +import io.awspring.cloud.sns.core.SnsNotification; +import io.awspring.cloud.sns.core.TopicArnResolver; +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.core.MessagePostProcessor; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.NotFoundException; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; + +/** + * Asynchronous template for SNS operations. + * + * @author Matej Nedic + * @since 4.1.0 + */ +public class SnsAsyncTemplate implements SnsAsyncOperations { + + private final SnsAsyncClient snsAsyncClient; + private final TopicArnResolver topicArnResolver; + private final SnsPublishMessageConverter snsPublishMessageConverter; + + public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient) { + this(snsAsyncClient, null); + } + + public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient, @Nullable SnsPublishMessageConverter messageConverter) { + this(snsAsyncClient, new CachingTopicArnResolver(new SnsAsyncTopicArnResolver(snsAsyncClient)), messageConverter); + } + + public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient, TopicArnResolver topicArnResolver, + @Nullable SnsPublishMessageConverter messageConverter) { + Assert.notNull(snsAsyncClient, "SnsAsyncClient must not be null"); + Assert.notNull(topicArnResolver, "TopicArnResolver must not be null"); + this.topicArnResolver = topicArnResolver; + this.snsAsyncClient = snsAsyncClient; + this.snsPublishMessageConverter = messageConverter == null ? new DefaultSnsPublishMessageConverter() : messageConverter; + } + + /** + * Sends a message to a destination. + * + * @param destination the destination name + * @param message the message to send + * @param the message payload type + * @return a CompletableFuture with the result + */ + @Override + public CompletableFuture> send(String destination, Message message) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(message, "message cannot be null"); + + Arn topicArn = topicArnResolver.resolveTopicArn(destination); + + PublishRequestMessagePair pair = snsPublishMessageConverter.convert(message); + PublishRequest request = pair.publishRequest() + .toBuilder() + .topicArn(topicArn.toString()) + .build(); + + return publish(request, pair.originalMessage()); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + + return convertAndSend(destination, payload, null, null); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + + return convertAndSend(destination, payload, headers, null); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + + return convertAndSend(destination, payload, null, postProcessor); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, @Nullable MessagePostProcessor postProcessor) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + + Message message = MessageBuilder + .withPayload(payload) + .copyHeaders(headers != null ? headers : Collections.emptyMap()) + .build(); + + if (postProcessor != null) { + message = (Message) postProcessor.postProcessMessage(message); + } + + return send(destination, message); + } + + @Override + public CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject) { + Assert.notNull(destinationName, "destinationName cannot be null"); + Assert.notNull(message, "message cannot be null"); + + return convertAndSend(destinationName, message, Collections.singletonMap(NOTIFICATION_SUBJECT_HEADER, subject)); + } + + @Override + public CompletableFuture> sendNotification(String topic, SnsNotification notification) { + Assert.notNull(topic, "topic cannot be null"); + Assert.notNull(notification, "notification cannot be null"); + + return convertAndSend(topic, notification.getPayload(), notification.getHeaders()); + } + + @Override + public CompletableFuture topicExists(String topicArn) { + Assert.notNull(topicArn, "topicArn must not be null"); + + return snsAsyncClient.getTopicAttributes(request -> request.topicArn(topicArn)) + .thenApply(response -> true) + .exceptionally(throwable -> { + if (throwable.getCause() instanceof NotFoundException) { + return false; + } + if (throwable.getCause() instanceof RuntimeException re) { + throw re; + } + throw new RuntimeException("Unexpected exception", throwable); + }); + } + + private CompletableFuture> publish(PublishRequest request, Message originalMessage) { + return snsAsyncClient.publish(request) + .thenApply(response -> new PublishMessageResult<>( + originalMessage, + response.messageId(), + response.sequenceNumber() + )); + } +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java new file mode 100644 index 000000000..c4423dcfb --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java @@ -0,0 +1,33 @@ +package io.awspring.cloud.sns.core.async; + +import org.springframework.messaging.Message; + +import java.util.Map; + +/** + * Converter for transforming Spring messages to SNS PublishRequest. + * + * @author Matej Nedic + * @since 4.1.0 + */ +public interface SnsPublishMessageConverter { + + /** + * Converts a Spring Message to a PublishRequest. + * + * @param message the message to convert + * @param the message payload type + * @return a pair containing the PublishRequest and the original message + */ + PublishRequestMessagePair convert(Message message); + + /** + * Converts a payload and headers to a PublishRequest. + * + * @param payload the payload to convert + * @param headers the headers to include + * @param the payload type + * @return a pair containing the PublishRequest and the constructed message + */ + PublishRequestMessagePair convert(T payload, Map headers); +} diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java new file mode 100644 index 000000000..ce2a9283d --- /dev/null +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sns.core.async; + +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.sns.Person; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.JacksonJsonMessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import tools.jackson.databind.json.JsonMapper; + +/** + * Tests for {@link DefaultSnsPublishMessageConverter}. + * + * @author Matej Nedic + * @since 4.1.0 + */ +class DefaultSnsPublishMessageConverterTest { + + private final DefaultSnsPublishMessageConverter converter = createConverter(); + + private static DefaultSnsPublishMessageConverter createConverter() { + JacksonJsonMessageConverter jackson = new JacksonJsonMessageConverter(new JsonMapper()); + jackson.setSerializedPayloadClass(String.class); + return new DefaultSnsPublishMessageConverter(jackson); + } + + @Test + void convertsStringPayload() { + Message message = MessageBuilder.withPayload("hello").build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().message()).isEqualTo("hello"); + assertThat(result.originalMessage()).isEqualTo(message); + } + + @Test + void convertsPayloadWithHeaders() { + PublishRequestMessagePair result = converter.convert("payload", + Map.of("custom", "value", NOTIFICATION_SUBJECT_HEADER, "Subject")); + + assertThat(result.publishRequest().message()).isEqualTo("payload"); + assertThat(result.publishRequest().subject()).isEqualTo("Subject"); + assertThat(result.originalMessage().getPayload()).isEqualTo("payload"); + assertThat(result.originalMessage().getHeaders()).containsEntry("custom", "value"); + } + + @Test + void setsSubjectFromHeader() { + Message message = MessageBuilder.withPayload("test") + .setHeader(NOTIFICATION_SUBJECT_HEADER, "My Subject").build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().subject()).isEqualTo("My Subject"); + } + + @Test + void setsFifoHeaders() { + Message message = MessageBuilder.withPayload("fifo") + .setHeader(MESSAGE_GROUP_ID_HEADER, "grp-1") + .setHeader(MESSAGE_DEDUPLICATION_ID_HEADER, "ded-1").build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().messageGroupId()).isEqualTo("grp-1"); + assertThat(result.publishRequest().messageDeduplicationId()).isEqualTo("ded-1"); + } + + @Test + void convertsCustomHeadersToMessageAttributes() { + Message message = MessageBuilder.withPayload("test") + .setHeader("priority", "high") + .setHeader("count", 42).build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().messageAttributes()).containsKey("priority"); + assertThat(result.publishRequest().messageAttributes()).containsKey("count"); + } + + @Test + void serializesComplexPayload() { + Person person = new Person("John"); + Message message = MessageBuilder.withPayload(person).build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().message()).contains("John"); + assertThat(result.originalMessage().getPayload()).isEqualTo(person); + } + + @Test + void handlesEmptyHeaders() { + PublishRequestMessagePair result = converter.convert("payload", Map.of()); + + assertThat(result.publishRequest().message()).isEqualTo("payload"); + assertThat(result.originalMessage().getPayload()).isEqualTo("payload"); + } +} diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java new file mode 100644 index 000000000..4860f5ae4 --- /dev/null +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java @@ -0,0 +1,258 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sns.core.async; + +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import io.awspring.cloud.sns.Person; +import io.awspring.cloud.sns.core.SnsNotification; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import net.bytebuddy.utility.RandomString; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.converter.JacksonJsonMessageConverter; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.CreateTopicRequest; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import tools.jackson.databind.JsonNode; +import tools.jackson.databind.json.JsonMapper; + +/** + * Integration tests for {@link SnsAsyncTemplate} + * + * @author Matej Nedic + * @since 4.1.0 + */ +@Testcontainers +class SnsAsyncTemplateIntegrationTest { + + private static final JsonMapper jsonMapper = JsonMapper.builder().build(); + + @Container + static LocalStackContainer localstack = new LocalStackContainer( + DockerImageName.parse("localstack/localstack:4.4.0")); + + private static SnsAsyncClient snsAsyncClient; + private static SqsClient sqsClient; + private static SnsAsyncTemplate snsAsyncTemplate; + private static String standardQueueUrl; + private static String standardTopicArn; + private static String fifoQueueUrl; + private static String fifoTopicArn; + + @BeforeAll + static void setUp() throws Exception { + StaticCredentialsProvider credentials = StaticCredentialsProvider + .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); + + snsAsyncClient = SnsAsyncClient.builder() + .endpointOverride(localstack.getEndpoint()) + .region(Region.of(localstack.getRegion())) + .credentialsProvider(credentials).build(); + + sqsClient = SqsClient.builder() + .endpointOverride(localstack.getEndpoint()) + .region(Region.of(localstack.getRegion())) + .credentialsProvider(credentials).build(); + + standardQueueUrl = sqsClient.createQueue(r -> r.queueName("async-standard-queue")).queueUrl(); + String standardQueueArn = sqsClient + .getQueueAttributes(r -> r.queueUrl(standardQueueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .attributes().get(QueueAttributeName.QUEUE_ARN); + standardTopicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name("async-standard-topic").build()) + .get().topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(standardTopicArn).protocol("sqs").endpoint(standardQueueArn)).get(); + + + fifoQueueUrl = sqsClient + .createQueue(r -> r.queueName("async-fifo-queue.fifo") + .attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true"))) + .queueUrl(); + String fifoQueueArn = sqsClient + .getQueueAttributes(r -> r.queueUrl(fifoQueueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .attributes().get(QueueAttributeName.QUEUE_ARN); + fifoTopicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name("async-fifo-topic.fifo") + .attributes(Map.of("FifoTopic", "true", "ContentBasedDeduplication", "true")).build()) + .get().topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(fifoTopicArn).protocol("sqs").endpoint(fifoQueueArn)).get(); + + JacksonJsonMessageConverter jackson = new JacksonJsonMessageConverter(); + jackson.setSerializedPayloadClass(String.class); + snsAsyncTemplate = new SnsAsyncTemplate(snsAsyncClient, + new DefaultSnsPublishMessageConverter(jackson)); + } + + private static List receiveAll(String queueUrl) { + List messages = new ArrayList<>(); + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(500)).until(() -> { + var response = sqsClient.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl) + .maxNumberOfMessages(10).waitTimeSeconds(2).messageAttributeNames("All").build()); + if (response.hasMessages() && !response.messages().isEmpty()) { + messages.addAll(response.messages()); + } + return !messages.isEmpty(); + }); + return messages; + } + + @Nested + class StandardTopicTests { + + @AfterEach + void purgeQueue() { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(standardQueueUrl).build()); + } + + @Test + void convertAndSendWithHeaders() throws Exception { + PublishMessageResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, "async message", + Map.of("custom-header", "custom-value")).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.sequenceNumber()).isNull(); + assertThat(result.message().getPayload()).isEqualTo("async message"); + assertThat(result.message().getHeaders()).containsEntry("custom-header", "custom-value"); + + var received = receiveAll(standardQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + assertThat(snsEnvelope.get("Message").asString()).isEqualTo("async message"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("custom-header").get("Value").asString()).isEqualTo("custom-value"); + } + + @Test + void convertAndSendWithPersonPayload() throws Exception { + Person person = new Person("John"); + PublishMessageResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, person, + Map.of("person-type", "employee", NOTIFICATION_SUBJECT_HEADER, "Person Update")).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.message().getPayload()).isEqualTo(person); + + var received = receiveAll(standardQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + Person receivedPerson = jsonMapper.readValue(snsEnvelope.get("Message").asString(), Person.class); + assertThat(receivedPerson.getName()).isEqualTo("John"); + assertThat(snsEnvelope.get("Subject").asString()).isEqualTo("Person Update"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("person-type").get("Value").asString()).isEqualTo("employee"); + } + + @Test + void sendNotificationWithSubject() throws Exception { + PublishMessageResult result = snsAsyncTemplate.sendNotification(standardTopicArn, + "message with subject", "Test Subject").get(); + + assertThat(result.messageId()).isNotNull(); + + var received = receiveAll(standardQueueUrl); + JsonNode body = jsonMapper.readTree(received.get(0).body()); + assertThat(body.get("Message").asString()).isEqualTo("message with subject"); + assertThat(body.get("Subject").asString()).isEqualTo("Test Subject"); + } + + @Test + void sendNotificationWithSnsNotification() throws Exception { + SnsNotification notification = SnsNotification.builder("notification payload") + .header(NOTIFICATION_SUBJECT_HEADER, "Notification Subject") + .header("notification-type", "alert").build(); + + PublishMessageResult result = snsAsyncTemplate.sendNotification(standardTopicArn, notification).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.message().getPayload()).isEqualTo("notification payload"); + + var received = receiveAll(standardQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + assertThat(snsEnvelope.get("Message").asString()).isEqualTo("notification payload"); + assertThat(snsEnvelope.get("Subject").asString()).isEqualTo("Notification Subject"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("notification-type").get("Value").asString()).isEqualTo("alert"); + } + } + + @Nested + class FifoTopicTests { + + @AfterEach + void purgeQueue() { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(fifoQueueUrl).build()); + } + + @Test + void convertAndSendWithFifoHeaders() throws Exception { + PublishMessageResult result = snsAsyncTemplate.convertAndSend(fifoTopicArn, "fifo message", + Map.of(MESSAGE_GROUP_ID_HEADER, "group-1", + MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-1", + "custom-fifo-header", "fifo-value")).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.message().getPayload()).isEqualTo("fifo message"); + assertThat(result.message().getHeaders()) + .containsEntry(MESSAGE_GROUP_ID_HEADER, "group-1") + .containsEntry(MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-1"); + + var received = receiveAll(fifoQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + assertThat(snsEnvelope.get("Message").asString()).isEqualTo("fifo message"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("custom-fifo-header").get("Value").asString()).isEqualTo("fifo-value"); + } + } + + @Test + void topicExistsReturnsTrueForExistingTopic() throws Exception { + String topicArn = snsAsyncClient + .createTopic(r -> r.name(RandomString.make())).get().topicArn(); + + assertThat(snsAsyncTemplate.topicExists(topicArn).get()).isTrue(); + } + + @Test + void topicExistsReturnsFalseForNonExistingTopic() throws Exception { + assertThat(snsAsyncTemplate.topicExists( + "arn:aws:sns:us-east-1:000000000000:nope").get()).isFalse(); + } +}