From 947dd965bcc27ff356daf097773009f1b92f3efe Mon Sep 17 00:00:00 2001 From: matejnedic Date: Fri, 6 Feb 2026 23:55:47 +0100 Subject: [PATCH 01/10] Initial commit --- docs/src/main/asciidoc/sns.adoc | 39 +++ .../sns/SnsAutoConfiguration.java | 63 +++- .../sns/SnsAutoConfigurationTest.java | 53 +++ .../sns/core/SnsHeaderConverterUtil.java | 120 +++++++ .../DefaultSnsPublishMessageConverter.java | 94 ++++++ .../core/async/PublishRequestMessagePair.java | 19 ++ .../sns/core/async/SnsAsyncOperations.java | 103 ++++++ .../sns/core/async/SnsAsyncTemplate.java | 128 ++++++++ .../async/SnsPublishMessageConverter.java | 33 ++ .../cloud/sns/core/async/SnsResult.java | 21 ++ ...DefaultSnsPublishMessageConverterTest.java | 147 +++++++++ .../SnsAsyncTemplateIntegrationTest.java | 303 ++++++++++++++++++ 12 files changed, 1106 insertions(+), 17 deletions(-) create mode 100644 spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java create mode 100644 spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java create mode 100644 spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java create mode 100644 spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java create mode 100644 spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java create mode 100644 spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java create mode 100644 spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java create mode 100644 spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java create mode 100644 spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java diff --git a/docs/src/main/asciidoc/sns.adoc b/docs/src/main/asciidoc/sns.adoc index 102fef9000..53e67510c8 100644 --- a/docs/src/main/asciidoc/sns.adoc +++ b/docs/src/main/asciidoc/sns.adoc @@ -147,6 +147,45 @@ class NotificationService { } ---- +==== SNS Async Template + +The starter automatically configures and registers a `SnsAsyncTemplate` bean when `SnsAsyncClient` bean is registered by user, providing non-blocking SNS operations. +`SnsAsyncTemplate` returns `CompletableFuture` for all operations, making it suitable for high-throughput async applications. + +It supports the same payload types as `SnsTemplate` and uses the same Spring `MessageConverter` hirarchy inside of `DefaultSnsPublishMessageConverter` to convert payloads. + +[source,java] +---- +import io.awspring.cloud.sns.core.async.SnsAsyncTemplate; +import io.awspring.cloud.sns.core.async.SnsResult; + +import java.util.concurrent.CompletableFuture; + +class NotificationService { + private final SnsAsyncTemplate snsAsyncTemplate; + + NotificationService(SnsAsyncTemplate snsAsyncTemplate) { + this.snsAsyncTemplate = snsAsyncTemplate; + } + + CompletableFuture> sendNotification() { + return snsAsyncTemplate.convertAndSend("topic-arn", "payload"); + } +} +---- + +To customize message conversion, provide a custom `SnsPublishMessageConverter` bean: + +[source,java] +---- +@Bean +public SnsPublishMessageConverter customConverter() { + return new MyCustomConverter(); +} +---- + +Autoconfiguration will pick it up and configure `SnsAsyncTemplate` automatically with it. + === Using SNS Client To have access to all lower level SNS operations, we recommend using `SnsClient` from AWS SDK. `SnsClient` bean is autoconfigured by `SnsAutoConfiguration`. diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java index 40e22e217e..9fb85062cc 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java @@ -28,14 +28,21 @@ import io.awspring.cloud.sns.core.SnsOperations; import io.awspring.cloud.sns.core.SnsTemplate; import io.awspring.cloud.sns.core.TopicArnResolver; +import io.awspring.cloud.sns.core.async.DefaultSnsPublishMessageConverter; +import io.awspring.cloud.sns.core.async.SnsAsyncOperations; +import io.awspring.cloud.sns.core.async.SnsAsyncTemplate; +import io.awspring.cloud.sns.core.async.SnsPublishMessageConverter; import io.awspring.cloud.sns.sms.SnsSmsOperations; import io.awspring.cloud.sns.sms.SnsSmsTemplate; + import java.util.List; import java.util.Optional; + import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; @@ -48,6 +55,7 @@ import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.web.method.support.HandlerMethodArgumentResolver; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; +import software.amazon.awssdk.services.sns.SnsAsyncClient; import software.amazon.awssdk.services.sns.SnsClient; import tools.jackson.databind.json.JsonMapper; @@ -63,22 +71,22 @@ * @author Mariusz Sondecki */ @AutoConfiguration -@ConditionalOnClass({ SnsClient.class, SnsTemplate.class }) -@EnableConfigurationProperties({ SnsProperties.class }) -@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnClass({SnsClient.class, SnsTemplate.class}) +@EnableConfigurationProperties({SnsProperties.class}) +@AutoConfigureAfter({CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class}) @ConditionalOnProperty(name = "spring.cloud.aws.sns.enabled", havingValue = "true", matchIfMissing = true) public class SnsAutoConfiguration { @ConditionalOnMissingBean @Bean public SnsClient snsClient(SnsProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer, - ObjectProvider connectionDetails, - ObjectProvider snsClientCustomizers, - ObjectProvider awsSyncClientCustomizers) { + ObjectProvider connectionDetails, + ObjectProvider snsClientCustomizers, + ObjectProvider awsSyncClientCustomizers) { return awsClientBuilderConfigurer - .configureSyncClient(SnsClient.builder(), properties, connectionDetails.getIfAvailable(), - snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream()) - .build(); + .configureSyncClient(SnsClient.builder(), properties, connectionDetails.getIfAvailable(), + snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream()) + .build(); } @ConditionalOnMissingBean(SnsSmsOperations.class) @@ -87,18 +95,40 @@ public SnsSmsTemplate snsSmsTemplate(SnsClient snsClient) { return new SnsSmsTemplate(snsClient); } + @Configuration + static class SnsAsyncTemplateConfiguration { + + @Bean + @ConditionalOnMissingBean(SnsAsyncOperations.class) + @ConditionalOnBean(SnsAsyncClient.class) + public SnsAsyncTemplate snsAsyncTemplate(SnsAsyncClient snsAsyncClient, Optional topicArnResolver, SnsPublishMessageConverter snsPublishMessageConverter) { + return topicArnResolver.map(it -> new SnsAsyncTemplate(snsAsyncClient, it, snsPublishMessageConverter)) + .orElseGet(() -> new SnsAsyncTemplate(snsAsyncClient, snsPublishMessageConverter)); + } + + @Bean + @ConditionalOnMissingBean(SnsPublishMessageConverter.class) + @ConditionalOnBean(SnsAsyncClient.class) + public SnsPublishMessageConverter snsPublishMessageConverter(Optional jsonMapper) { + JacksonJsonMessageConverter converter = new JacksonJsonMessageConverter( + jsonMapper.orElseGet(JsonMapper::new)); + converter.setSerializedPayloadClass(String.class); + return new DefaultSnsPublishMessageConverter(converter); + } + } + @ConditionalOnClass(name = "tools.jackson.databind.json.JsonMapper") @Configuration static class SnsConfiguration { @ConditionalOnMissingBean(SnsOperations.class) @Bean public SnsTemplate snsTemplate(SnsClient snsClient, Optional jsonMapper, - Optional topicArnResolver, ObjectProvider interceptors) { + Optional topicArnResolver, ObjectProvider interceptors) { JacksonJsonMessageConverter converter = new JacksonJsonMessageConverter( - jsonMapper.orElseGet(JsonMapper::new)); + jsonMapper.orElseGet(JsonMapper::new)); converter.setSerializedPayloadClass(String.class); SnsTemplate snsTemplate = topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter)) - .orElseGet(() -> new SnsTemplate(snsClient, converter)); + .orElseGet(() -> new SnsTemplate(snsClient, converter)); interceptors.forEach(snsTemplate::addChannelInterceptor); return snsTemplate; @@ -112,12 +142,12 @@ static class LegacyJackson2Configuration { @ConditionalOnMissingBean(SnsOperations.class) @Bean public SnsTemplate snsTemplate(SnsClient snsClient, Optional objectMapper, - Optional topicArnResolver, ObjectProvider interceptors) { + Optional topicArnResolver, ObjectProvider interceptors) { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setSerializedPayloadClass(String.class); objectMapper.ifPresent(converter::setObjectMapper); SnsTemplate snsTemplate = topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter)) - .orElseGet(() -> new SnsTemplate(snsClient, converter)); + .orElseGet(() -> new SnsTemplate(snsClient, converter)); interceptors.forEach(snsTemplate::addChannelInterceptor); return snsTemplate; @@ -137,8 +167,7 @@ public void addArgumentResolvers(List resolvers) resolvers.add(getNotificationHandlerMethodArgumentResolver(snsClient)); } }; - } - else if (JacksonPresent.isJackson2Present()) { + } else if (JacksonPresent.isJackson2Present()) { return new WebMvcConfigurer() { @Override public void addArgumentResolvers(List resolvers) { @@ -147,7 +176,7 @@ public void addArgumentResolvers(List resolvers) }; } throw new IllegalStateException( - "SecretsManagerPropertySource requires a Jackson 2 or Jackson 3 library on the classpath"); + "SnsWebMvc integration requires a Jackson 2 or Jackson 3 library on the classpath"); } } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java index 0ea9cf7c5d..7cdf2b8181 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfigurationTest.java @@ -25,9 +25,12 @@ import io.awspring.cloud.sns.core.SnsOperations; import io.awspring.cloud.sns.core.SnsTemplate; import io.awspring.cloud.sns.core.TopicArnResolver; +import io.awspring.cloud.sns.core.async.SnsAsyncTemplate; import io.awspring.cloud.sns.sms.SnsSmsOperations; import io.awspring.cloud.sns.sms.SnsSmsTemplate; import java.net.URI; + +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -38,6 +41,7 @@ import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.sns.SnsAsyncClient; import software.amazon.awssdk.services.sns.SnsClient; /** @@ -123,6 +127,37 @@ void customChannelInterceptorCanBeConfigured() { .run(context -> assertThat(context).hasSingleBean(CustomChannelInterceptor.class)); } + @Nested + class SnsAsyncTemplateTests { + + @Test + void snsAsyncTemplateIsNotCreatedWhenSnsAsyncClientIsNotPresent() { + contextRunner.run(context -> { + assertThat(context).hasSingleBean(SnsClient.class); + assertThat(context).hasSingleBean(SnsTemplate.class); + assertThat(context).doesNotHaveBean(SnsAsyncTemplate.class); + }); + } + + @Test + void snsAsyncTemplateIsCreatedWhenSnsAsyncClientIsPresent() { + contextRunner.withUserConfiguration(SnsAsyncClientConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SnsClient.class); + assertThat(context).hasSingleBean(SnsTemplate.class); + assertThat(context).hasSingleBean(SnsAsyncClient.class); + assertThat(context).hasSingleBean(SnsAsyncTemplate.class); + }); + } + + @Test + void bothAsyncTemplatesAndOperationsAreInjectable() { + contextRunner.withUserConfiguration(SnsAsyncClientConfiguration.class, InjectingAsyncTemplatesConfiguration.class).run(context -> { + assertThat(context.isRunning()).isTrue(); + assertThat(context).hasSingleBean(SnsAsyncTemplate.class); + }); + } + } + @Configuration(proxyBeanMethods = false) static class CustomTopicArnResolverConfiguration { @@ -189,4 +224,22 @@ ChannelInterceptor customChannelInterceptor() { static class CustomChannelInterceptor implements ChannelInterceptor { } + + @Configuration(proxyBeanMethods = false) + static class SnsAsyncClientConfiguration { + + @Bean + SnsAsyncClient snsAsyncClient() { + return mock(SnsAsyncClient.class); + } + } + + @Configuration(proxyBeanMethods = false) + static class InjectingAsyncTemplatesConfiguration { + @Bean + ApplicationRunner asyncRunner1(SnsAsyncTemplate snsAsyncTemplate) { + return args -> { + }; + } + } } diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java new file mode 100644 index 0000000000..a50b02b4b7 --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java @@ -0,0 +1,120 @@ +package io.awspring.cloud.sns.core; + +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.NumberUtils; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.sns.model.MessageAttributeValue; + +/** + * @author Agim Emruli + * @author Alain Sahli + * @author Gyozo Papp + * @author Matej Nedic + */ +public class SnsHeaderConverterUtil { + private static final Log logger = LogFactory.getLog(SnsHeaderConverterUtil.class); + public static JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create(); + + public static Map toSnsMessageAttributes(Message message) { + HashMap messageAttributes = new HashMap<>(); + for (Map.Entry messageHeader : message.getHeaders().entrySet()) { + String messageHeaderName = messageHeader.getKey(); + Object messageHeaderValue = messageHeader.getValue(); + + if (isSkipHeader(messageHeaderName)) { + continue; + } + + if (MessageHeaders.CONTENT_TYPE.equals(messageHeaderName) && messageHeaderValue != null) { + messageAttributes.put(messageHeaderName, getContentTypeMessageAttribute(messageHeaderValue)); + } else if (MessageHeaders.ID.equals(messageHeaderName) && messageHeaderValue != null) { + messageAttributes.put(messageHeaderName, getStringMessageAttribute(messageHeaderValue.toString())); + } else if (MessageHeaders.TIMESTAMP.equals(messageHeaderName) && messageHeaderValue != null) { + messageAttributes.put(messageHeaderName, getDetailedNumberMessageAttribute(messageHeaderValue)); + } else if (messageHeaderValue instanceof String) { + messageAttributes.put(messageHeaderName, getStringMessageAttribute((String) messageHeaderValue)); + } else if (messageHeaderValue instanceof Number) { + messageAttributes.put(messageHeaderName, getDetailedNumberMessageAttribute(messageHeaderValue)); + } else if (messageHeaderValue instanceof ByteBuffer) { + messageAttributes.put(messageHeaderName, getBinaryMessageAttribute((ByteBuffer) messageHeaderValue)); + } else if (messageHeaderValue instanceof List) { + messageAttributes.put(messageHeaderName, getStringArrayMessageAttribute((List) messageHeaderValue)); + } else { + logger.warn(String.format( + "Message header with name '%s' and type '%s' cannot be sent as" + + " message attribute because it is not supported by SNS.", + messageHeaderName, messageHeaderValue != null ? messageHeaderValue.getClass().getName() : "")); + } + } + + return messageAttributes; + } + + private static boolean isSkipHeader(String headerName) { + return NOTIFICATION_SUBJECT_HEADER.equals(headerName) || MESSAGE_GROUP_ID_HEADER.equals(headerName) + || MESSAGE_DEDUPLICATION_ID_HEADER.equals(headerName); + } + + private static MessageAttributeValue getStringArrayMessageAttribute(List messageHeaderValue) { + String stringValue = messageHeaderValue.stream().map(item -> { + StringBuilder sb = new StringBuilder(); + jsonStringEncoder.quoteAsString(item.toString(), sb); + return "\"" + sb + "\""; + }).collect(Collectors.joining(", ", "[", "]")); + + return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING_ARRAY).stringValue(stringValue) + .build(); + } + + private static MessageAttributeValue getBinaryMessageAttribute(ByteBuffer messageHeaderValue) { + return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.BINARY) + .binaryValue(SdkBytes.fromByteBuffer(messageHeaderValue)).build(); + } + + @Nullable + private static MessageAttributeValue getContentTypeMessageAttribute(Object messageHeaderValue) { + if (messageHeaderValue instanceof MimeType) { + return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING) + .stringValue(messageHeaderValue.toString()).build(); + } else if (messageHeaderValue instanceof String) { + return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING) + .stringValue((String) messageHeaderValue).build(); + } + return null; + } + + private static MessageAttributeValue getStringMessageAttribute(String messageHeaderValue) { + return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING) + .stringValue(messageHeaderValue).build(); + } + + private static MessageAttributeValue getDetailedNumberMessageAttribute(Object messageHeaderValue) { + Assert.isTrue(NumberUtils.STANDARD_NUMBER_TYPES.contains(messageHeaderValue.getClass()), + "Only standard number types are accepted as message header."); + + return MessageAttributeValue.builder() + .dataType(MessageAttributeDataTypes.NUMBER + "." + messageHeaderValue.getClass().getName()) + .stringValue(messageHeaderValue.toString()).build(); + } + + private static MessageAttributeValue getNumberMessageAttribute(Object messageHeaderValue) { + return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.NUMBER) + .stringValue(messageHeaderValue.toString()).build(); + } +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java new file mode 100644 index 0000000000..e661092d6b --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java @@ -0,0 +1,94 @@ +package io.awspring.cloud.sns.core.async; + +import io.awspring.cloud.sns.core.SnsHeaderConverterUtil; +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.converter.StringMessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import software.amazon.awssdk.services.sns.model.MessageAttributeValue; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; + +/** + * Default implementation of {@link SnsPublishMessageConverter}. + * + * @author Matej Nedic + * @since 4.0.1 + */ +public class DefaultSnsPublishMessageConverter implements SnsPublishMessageConverter { + + private final MessageConverter messageConverter; + + public DefaultSnsPublishMessageConverter() { + this(null); + } + + public DefaultSnsPublishMessageConverter(@Nullable MessageConverter messageConverter) { + this.messageConverter = initMessageConverter(messageConverter); + } + + @Override + public PublishRequestMessagePair convert(Message originalMessage) { + PublishRequest.Builder publishRequest = PublishRequest.builder(); + populateHeaders(publishRequest, originalMessage); + + Message convertedMessage = messageConverter.toMessage( + originalMessage.getPayload(), + originalMessage.getHeaders() + ); + publishRequest.message(convertedMessage.getPayload().toString()); + + return new PublishRequestMessagePair<>(publishRequest.build(), originalMessage); + } + + @Override + public PublishRequestMessagePair convert(T payload, Map headers) { + Message originalMessage = MessageBuilder + .withPayload(payload) + .copyHeaders(headers) + .build(); + + return convert(originalMessage); + } + + private void populateHeaders(PublishRequest.Builder publishRequest, Message message) { + Map messageAttributes = SnsHeaderConverterUtil.toSnsMessageAttributes(message); + if (!messageAttributes.isEmpty()) { + publishRequest.messageAttributes(messageAttributes); + } + + Optional.ofNullable(message.getHeaders().get(NOTIFICATION_SUBJECT_HEADER, String.class)) + .ifPresent(publishRequest::subject); + + Optional.ofNullable(message.getHeaders().get(MESSAGE_GROUP_ID_HEADER, String.class)) + .ifPresent(publishRequest::messageGroupId); + + Optional.ofNullable(message.getHeaders().get(MESSAGE_DEDUPLICATION_ID_HEADER, String.class)) + .ifPresent(publishRequest::messageDeduplicationId); + } + + + private static CompositeMessageConverter initMessageConverter(@Nullable MessageConverter messageConverter) { + List converters = new ArrayList<>(); + + StringMessageConverter stringMessageConverter = new StringMessageConverter(); + stringMessageConverter.setSerializedPayloadClass(String.class); + converters.add(stringMessageConverter); + + if (messageConverter != null) { + converters.add(messageConverter); + } + + return new CompositeMessageConverter(converters); + } +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java new file mode 100644 index 0000000000..a73a2d6759 --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java @@ -0,0 +1,19 @@ +package io.awspring.cloud.sns.core.async; + +import org.springframework.messaging.Message; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +/** + * Pair containing a PublishRequest and the original Spring Message. + * + * @param publishRequest the AWS SNS publish request + * @param originalMessage the original Spring message + * @param the message payload type + * @author Matej Nedic + * @since 4.0.1 + */ +public record PublishRequestMessagePair( + PublishRequest publishRequest, + Message originalMessage +) { +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java new file mode 100644 index 0000000000..0bb0e53cea --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java @@ -0,0 +1,103 @@ +package io.awspring.cloud.sns.core.async; + +import io.awspring.cloud.sns.core.SnsNotification; +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.core.MessagePostProcessor; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * High level asynchronous SNS operations. + * + * @author Matej Nedic + * @since 4.0.1 + */ +public interface SnsAsyncOperations { + + /** + * Sends a message to a destination. + * + * @param destination the destination name + * @param message the message to send + * @param the message payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> send(String destination, Message message) ; + + /** + * Converts and sends a payload to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload) ; + + /** + * Converts and sends a payload with headers to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param headers the headers to include + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers) + ; + + /** + * Converts and sends a payload with a post processor to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param postProcessor the post processor to apply + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor) + ; + + /** + * Converts and sends a payload with headers and a post processor to a destination. + * + * @param destination the destination name + * @param payload the payload to send + * @param headers the headers to include + * @param postProcessor the post processor to apply + * @param the payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, + @Nullable MessagePostProcessor postProcessor); + + /** + * Sends a notification with a message and subject to a destination. + * + * @param destinationName the destination name + * @param message the message to send + * @param subject the subject (can be null) + * @return a CompletableFuture with the result + */ + CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject); + + /** + * Sends a notification to a topic. + * + * @param topic the topic name + * @param notification the notification to send + * @param the notification payload type + * @return a CompletableFuture with the result + */ + CompletableFuture> sendNotification(String topic, SnsNotification notification); + + /** + * Checks if a topic with the given ARN exists. + * + * @param topicArn the ARN of the topic + * @return a CompletableFuture with true if the topic exists, false otherwise + */ + CompletableFuture topicExists(String topicArn); +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java new file mode 100644 index 0000000000..6678193cbc --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java @@ -0,0 +1,128 @@ +package io.awspring.cloud.sns.core.async; + +import io.awspring.cloud.sns.core.CachingTopicArnResolver; +import io.awspring.cloud.sns.core.SnsAsyncTopicArnResolver; +import io.awspring.cloud.sns.core.SnsNotification; +import io.awspring.cloud.sns.core.TopicArnResolver; +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.core.MessagePostProcessor; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.NotFoundException; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; + +/** + * Asynchronous template for SNS operations. + * + * @author Matej Nedic + * @since 4.0.1 + */ +public class SnsAsyncTemplate implements SnsAsyncOperations { + + private final SnsAsyncClient snsAsyncClient; + private final TopicArnResolver topicArnResolver; + private final SnsPublishMessageConverter snsPublishMessageConverter; + + public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient) { + this(snsAsyncClient, null); + } + + public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient, @Nullable SnsPublishMessageConverter messageConverter) { + this(snsAsyncClient, new CachingTopicArnResolver(new SnsAsyncTopicArnResolver(snsAsyncClient)), messageConverter); + } + + public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient, TopicArnResolver topicArnResolver, + @Nullable SnsPublishMessageConverter messageConverter) { + Assert.notNull(snsAsyncClient, "SnsAsyncClient must not be null"); + Assert.notNull(topicArnResolver, "TopicArnResolver must not be null"); + this.topicArnResolver = topicArnResolver; + this.snsAsyncClient = snsAsyncClient; + this.snsPublishMessageConverter = messageConverter == null ? new DefaultSnsPublishMessageConverter() : messageConverter; + } + + @Override + public CompletableFuture> send(String destination, Message message) { + Arn topicArn = topicArnResolver.resolveTopicArn(destination); + + PublishRequestMessagePair pair = snsPublishMessageConverter.convert(message); + PublishRequest request = pair.publishRequest() + .toBuilder() + .topicArn(topicArn.toString()) + .build(); + + return publish(request, pair.originalMessage()); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload) { + return convertAndSend(destination, payload, null, null); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers) { + return convertAndSend(destination, payload, headers, null); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor) { + return convertAndSend(destination, payload, null, postProcessor); + } + + @Override + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, @Nullable MessagePostProcessor postProcessor) { + Message message = MessageBuilder + .withPayload(payload) + .copyHeaders(headers != null ? headers : Collections.emptyMap()) + .build(); + + if (postProcessor != null) { + message = (Message) postProcessor.postProcessMessage(message); + } + + return send(destination, message); + } + + @Override + public CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject) { + return convertAndSend(destinationName, message, Collections.singletonMap(NOTIFICATION_SUBJECT_HEADER, subject)); + } + + @Override + public CompletableFuture> sendNotification(String topic, SnsNotification notification) { + return convertAndSend(topic, notification.getPayload(), notification.getHeaders()); + } + + @Override + public CompletableFuture topicExists(String topicArn) { + Assert.notNull(topicArn, "topicArn must not be null"); + return snsAsyncClient.getTopicAttributes(request -> request.topicArn(topicArn)) + .thenApply(response -> true) + .exceptionally(throwable -> { + if (throwable.getCause() instanceof NotFoundException) { + return false; + } + if (throwable.getCause() instanceof RuntimeException re) { + throw re; + } + throw new RuntimeException("Unexpected exception", throwable); + }); + } + + private CompletableFuture> publish(PublishRequest request, Message originalMessage) { + return snsAsyncClient.publish(request) + .thenApply(response -> new SnsResult<>( + originalMessage, + response.messageId(), + response.sequenceNumber() + )); + } +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java new file mode 100644 index 0000000000..e60dec68c4 --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java @@ -0,0 +1,33 @@ +package io.awspring.cloud.sns.core.async; + +import org.springframework.messaging.Message; + +import java.util.Map; + +/** + * Converter for transforming Spring messages to SNS PublishRequest. + * + * @author Matej Nedic + * @since 4.0.1 + */ +public interface SnsPublishMessageConverter { + + /** + * Converts a Spring Message to a PublishRequest. + * + * @param message the message to convert + * @param the message payload type + * @return a pair containing the PublishRequest and the original message + */ + PublishRequestMessagePair convert(Message message); + + /** + * Converts a payload and headers to a PublishRequest. + * + * @param payload the payload to convert + * @param headers the headers to include + * @param the payload type + * @return a pair containing the PublishRequest and the constructed message + */ + PublishRequestMessagePair convert(T payload, Map headers); +} diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java new file mode 100644 index 0000000000..7d7236c985 --- /dev/null +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java @@ -0,0 +1,21 @@ +package io.awspring.cloud.sns.core.async; + +import org.jspecify.annotations.Nullable; +import org.springframework.messaging.Message; + +/** + * Result of an SNS operation. + * + * @param message the original message that was sent + * @param messageId the SNS message ID returned by AWS + * @param sequenceNumber the sequence number for FIFO topics (null for standard topics) + * @param the message payload type + * @author Matej Nedic + * @since 4.0.1 + */ +public record SnsResult( + Message message, + String messageId, + @Nullable String sequenceNumber +) { +} diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java new file mode 100644 index 0000000000..7ca539ae93 --- /dev/null +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java @@ -0,0 +1,147 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sns.core.async; + +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.sns.Person; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.JacksonJsonMessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import software.amazon.awssdk.services.sns.model.MessageAttributeValue; + +/** + * Unit tests for {@link DefaultSnsPublishMessageConverter}. + * + * @author Matej Nedic + * @since 4.0.1 + */ +class DefaultSnsPublishMessageConverterTest { + + private DefaultSnsPublishMessageConverter converter; + + @BeforeEach + void setUp() { + JacksonJsonMessageConverter messageConverter = new JacksonJsonMessageConverter(); + messageConverter.setSerializedPayloadClass(String.class); + converter = new DefaultSnsPublishMessageConverter(messageConverter); + } + + @Test + void convert_withMessage_returnsPublishRequestAndOriginalMessage() { + Message message = MessageBuilder + .withPayload("test payload") + .setHeader("custom-header", "custom-value") + .build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest()).isNotNull(); + assertThat(result.publishRequest().message()).isEqualTo("test payload"); + assertThat(result.originalMessage()).isEqualTo(message); + assertThat(result.originalMessage().getPayload()).isEqualTo("test payload"); + } + + @Test + void convert_withPayloadAndHeaders_buildsMessageAndReturnsIt() { + Map headers = new HashMap<>(); + headers.put("custom-header", "custom-value"); + headers.put(NOTIFICATION_SUBJECT_HEADER, "Test Subject"); + + PublishRequestMessagePair result = converter.convert("test payload", headers); + + assertThat(result.publishRequest()).isNotNull(); + assertThat(result.publishRequest().message()).isEqualTo("test payload"); + assertThat(result.publishRequest().subject()).isEqualTo("Test Subject"); + assertThat(result.publishRequest().messageAttributes().get("custom-header")).isEqualTo(MessageAttributeValue.builder().stringValue("custom-value").build()); + assertThat(result.originalMessage()).isNotNull(); + assertThat(result.originalMessage().getPayload()).isEqualTo("test payload"); + assertThat(result.originalMessage().getHeaders().get("custom-header")).isEqualTo("custom-value"); + assertThat(result.originalMessage().getHeaders().get(NOTIFICATION_SUBJECT_HEADER)).isEqualTo("Test Subject"); + } + + @Test + void convert_withSubjectHeader_setsSubjectInPublishRequest() { + Message message = MessageBuilder + .withPayload("test payload") + .setHeader(NOTIFICATION_SUBJECT_HEADER, "My Subject") + .build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().subject()).isEqualTo("My Subject"); + } + + @Test + void convert_withFifoHeaders_setsFifoAttributesInPublishRequest() { + Message message = MessageBuilder + .withPayload("test payload") + .setHeader(MESSAGE_GROUP_ID_HEADER, "group-123") + .setHeader(MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-456") + .build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().messageGroupId()).isEqualTo("group-123"); + assertThat(result.publishRequest().messageDeduplicationId()).isEqualTo("dedup-456"); + } + + @Test + void convert_withMessageAttributes_setsMessageAttributesInPublishRequest() { + Message message = MessageBuilder + .withPayload("test payload") + .setHeader("string-attr", "value") + .setHeader("number-attr", 42) + .build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().messageAttributes()).isNotEmpty(); + assertThat(result.publishRequest().messageAttributes()).containsKey("string-attr"); + assertThat(result.publishRequest().messageAttributes()).containsKey("number-attr"); + } + + @Test + void convert_withComplexObject_serializesPayload() { + Person testObject = new Person("test"); + Message message = MessageBuilder + .withPayload(testObject) + .build(); + + PublishRequestMessagePair result = converter.convert(message); + + assertThat(result.publishRequest().message()).contains("test"); + assertThat(result.originalMessage().getPayload()).isEqualTo(testObject); + } + + @Test + void convert_withEmptyHeaders_createsValidPublishRequest() { + PublishRequestMessagePair result = converter.convert("payload", Map.of()); + + assertThat(result.publishRequest()).isNotNull(); + assertThat(result.publishRequest().message()).isEqualTo("payload"); + assertThat(result.originalMessage()).isNotNull(); + assertThat(result.originalMessage().getPayload()).isEqualTo("payload"); + } + +} diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java new file mode 100644 index 0000000000..ed81a2f984 --- /dev/null +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java @@ -0,0 +1,303 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sns.core.async; + +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER; +import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import io.awspring.cloud.sns.Person; +import io.awspring.cloud.sns.core.SnsNotification; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import net.bytebuddy.utility.RandomString; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.converter.JacksonJsonMessageConverter; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sns.SnsAsyncClient; +import software.amazon.awssdk.services.sns.model.CreateTopicRequest; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import tools.jackson.databind.JsonNode; +import tools.jackson.databind.json.JsonMapper; + +/** + * Integration tests for {@link SnsAsyncTemplate}. + * + * @author Matej Nedic + * @since 4.0.1 + */ +@Testcontainers +class SnsAsyncTemplateIntegrationTest { + + private static final String TOPIC_NAME = "async_topic_name"; + private static SnsAsyncTemplate snsAsyncTemplate; + private static SnsAsyncClient snsAsyncClient; + private static SqsClient sqsClient; + + private final JsonMapper jsonMapper = new JsonMapper(); + + @Container + static LocalStackContainer localstack = new LocalStackContainer( + DockerImageName.parse("localstack/localstack:latest")); + + @BeforeAll + public static void createSnsAsyncTemplate() { + snsAsyncClient = SnsAsyncClient.builder() + .endpointOverride(localstack.getEndpoint()) + .region(Region.of(localstack.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) + .build(); + + sqsClient = SqsClient.builder() + .endpointOverride(localstack.getEndpoint()) + .region(Region.of(localstack.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) + .build(); + + JacksonJsonMessageConverter jacksonJsonConverter = new JacksonJsonMessageConverter(); + jacksonJsonConverter.setSerializedPayloadClass(String.class); + + DefaultSnsPublishMessageConverter converter = new DefaultSnsPublishMessageConverter(jacksonJsonConverter); + snsAsyncTemplate = new SnsAsyncTemplate(snsAsyncClient, converter); + } + + @Nested + class FifoTopics { + private static String queueUrl; + private static String queueArn; + + @BeforeAll + public static void init() { + queueUrl = sqsClient + .createQueue(r -> r.queueName("async-queue.fifo") + .attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true"))) + .queueUrl(); + queueArn = sqsClient + .getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .attributes().get(QueueAttributeName.QUEUE_ARN); + } + + @Test + void convertAndSend_validTextMessage_usesFifoTopic_sendsToSqs() throws Exception { + String topicName = "async_fifo_topic.fifo"; + Map topicAttributes = new HashMap<>(); + topicAttributes.put("FifoTopic", String.valueOf(true)); + String topicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name(topicName).attributes(topicAttributes).build()) + .get() + .topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); + + Map headers = new HashMap<>(); + headers.put(MESSAGE_GROUP_ID_HEADER, "group-id"); + headers.put(MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-id"); + headers.put("custom-fifo-header", "fifo-value"); + CompletableFuture> result = snsAsyncTemplate.convertAndSend( + topicName, + "async message", + headers + ); + + SnsResult snsResult = result.get(); + assertThat(snsResult.messageId()).isNotNull(); + assertThat(snsResult.message().getPayload()).isEqualTo("async message"); + assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", MESSAGE_GROUP_ID_HEADER, MESSAGE_DEDUPLICATION_ID_HEADER, "custom-fifo-header"); + assertThat(snsResult.message().getHeaders().get(MESSAGE_GROUP_ID_HEADER)).isEqualTo("group-id"); + assertThat(snsResult.message().getHeaders().get(MESSAGE_DEDUPLICATION_ID_HEADER)).isEqualTo("dedup-id"); + + await().untilAsserted(() -> { + ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); + assertThat(response.hasMessages()).isTrue(); + JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); + assertThat(body.get("Message").asString()).isEqualTo("async message"); + }); + } + + @AfterEach + public void purgeQueue() { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); + } + } + + @Nested + class NonFifoTopics { + private static String queueUrl; + private static String queueArn; + + @BeforeAll + public static void init() { + queueUrl = sqsClient.createQueue(r -> r.queueName("async-standard-queue")).queueUrl(); + queueArn = sqsClient + .getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .attributes().get(QueueAttributeName.QUEUE_ARN); + } + + @Test + void convertAndSend_validTextMessage_sendsToStandardTopic() throws Exception { + String topicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME).build()) + .get() + .topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); + + Map headers = new HashMap<>(); + headers.put("custom-header", "custom-value"); + headers.put("number-header", 42); + CompletableFuture> result = snsAsyncTemplate.convertAndSend(topicArn, "async message", headers); + + SnsResult snsResult = result.get(); + assertThat(snsResult.messageId()).isNotNull(); + assertThat(snsResult.sequenceNumber()).isNull(); + assertThat(snsResult.message().getPayload()).isEqualTo("async message"); + assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", "custom-header", "number-header"); + assertThat(snsResult.message().getHeaders().get("custom-header")).isEqualTo("custom-value"); + assertThat(snsResult.message().getHeaders().get("number-header")).isEqualTo(42); + + await().untilAsserted(() -> { + ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); + assertThat(response.hasMessages()).isTrue(); + JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); + assertThat(body.get("Message").asString()).isEqualTo("async message"); + }); + } + + @Test + void convertAndSend_validPersonObject_sendsToStandardTopic() throws Exception { + String topicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME + "_person").build()) + .get() + .topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); + + Person person = new Person("John Doe"); + Map headers = new HashMap<>(); + headers.put("person-type", "employee"); + headers.put(NOTIFICATION_SUBJECT_HEADER, "Person Update"); + CompletableFuture> result = snsAsyncTemplate.convertAndSend(topicArn, person, headers); + + SnsResult snsResult = result.get(); + assertThat(snsResult.messageId()).isNotNull(); + assertThat(snsResult.message().getPayload()).isEqualTo(person); + assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", "person-type", NOTIFICATION_SUBJECT_HEADER); + assertThat(snsResult.message().getHeaders().get("person-type")).isEqualTo("employee"); + + await().untilAsserted(() -> { + ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); + assertThat(response.hasMessages()).isTrue(); + Person receivedPerson = jsonMapper.readValue( + jsonMapper.readTree(response.messages().get(0).body()).get("Message").asString(), + Person.class + ); + assertThat(receivedPerson.getName()).isEqualTo("John Doe"); + }); + } + + @Test + void sendNotification_withSubject_sendsMessageWithSubject() throws Exception { + String topicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME + "_subject").build()) + .get() + .topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); + + CompletableFuture> result = snsAsyncTemplate.sendNotification( + topicArn, + "message with subject", + "Test Subject" + ); + + SnsResult snsResult = result.get(); + assertThat(snsResult.messageId()).isNotNull(); + + await().untilAsserted(() -> { + ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); + assertThat(response.hasMessages()).isTrue(); + JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); + assertThat(body.get("Message").asString()).isEqualTo("message with subject"); + assertThat(body.get("Subject").asString()).isEqualTo("Test Subject"); + }); + } + + @Test + void sendNotification_withSnsNotification_sendsMessageWithHeaders() throws Exception { + String topicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME + "_notification").build()) + .get() + .topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); + + SnsNotification notification = SnsNotification.builder("notification payload") + .header(NOTIFICATION_SUBJECT_HEADER, "Notification Subject") + .header("notification-type", "alert") + .header("priority", 1) + .build(); + + CompletableFuture> result = snsAsyncTemplate.sendNotification(topicArn, notification); + + SnsResult snsResult = result.get(); + assertThat(snsResult.messageId()).isNotNull(); + assertThat(snsResult.message().getPayload()).isEqualTo("notification payload"); + assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", NOTIFICATION_SUBJECT_HEADER, "notification-type", "priority"); + + await().untilAsserted(() -> { + ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); + assertThat(response.hasMessages()).isTrue(); + JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); + assertThat(body.get("Message").asString()).isEqualTo("notification payload"); + assertThat(body.get("Subject").asString()).isEqualTo("Notification Subject"); + }); + } + + @AfterEach + public void purgeQueue() { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); + } + } + + @Test + void topicExists_shouldReturnFalseForNonExistingTopic() throws Exception { + String nonExistentTopicArn = "arn:aws:sns:us-east-1:000000000000:nope"; + + CompletableFuture result = snsAsyncTemplate.topicExists(nonExistentTopicArn); + + assertThat(result.get()).isFalse(); + } + + @Test + void topicExists_shouldReturnTrueForExistingTopic() throws Exception { + String topicName = RandomString.make(); + var topicArn = snsAsyncClient.createTopic(request -> request.name(topicName)).get().topicArn(); + + CompletableFuture result = snsAsyncTemplate.topicExists(topicArn); + + assertThat(result.get()).isTrue(); + } +} From 4187c2e020a04683dedbee35845bdf00012a6bdf Mon Sep 17 00:00:00 2001 From: matejnedic Date: Sat, 7 Feb 2026 00:02:23 +0100 Subject: [PATCH 02/10] Update version --- .../cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java index ed81a2f984..6ca18d071b 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java @@ -66,7 +66,7 @@ class SnsAsyncTemplateIntegrationTest { @Container static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:latest")); + DockerImageName.parse("localstack/localstack:4.4.0")); @BeforeAll public static void createSnsAsyncTemplate() { From 917227c5b8d27950f2588fd051f4b4e81463d427 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Sat, 7 Feb 2026 00:04:43 +0100 Subject: [PATCH 03/10] Update version --- .../sns/core/async/DefaultSnsPublishMessageConverterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java index 7ca539ae93..0eaa9137b7 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java @@ -73,7 +73,6 @@ void convert_withPayloadAndHeaders_buildsMessageAndReturnsIt() { assertThat(result.publishRequest()).isNotNull(); assertThat(result.publishRequest().message()).isEqualTo("test payload"); assertThat(result.publishRequest().subject()).isEqualTo("Test Subject"); - assertThat(result.publishRequest().messageAttributes().get("custom-header")).isEqualTo(MessageAttributeValue.builder().stringValue("custom-value").build()); assertThat(result.originalMessage()).isNotNull(); assertThat(result.originalMessage().getPayload()).isEqualTo("test payload"); assertThat(result.originalMessage().getHeaders().get("custom-header")).isEqualTo("custom-value"); From 87008933128f4530d5de43a0a689f6b13068c738 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Mon, 9 Feb 2026 20:02:02 +0100 Subject: [PATCH 04/10] Refactor --- .../sns/core/SnsHeaderConverterUtil.java | 5 +- .../DefaultSnsPublishMessageConverter.java | 6 + .../sns/core/async/SnsAsyncTemplate.java | 22 ++ ...DefaultSnsPublishMessageConverterTest.java | 104 ++--- .../SnsAsyncTemplateIntegrationTest.java | 363 ++++++++---------- 5 files changed, 230 insertions(+), 270 deletions(-) diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java index a50b02b4b7..4eb67786fb 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java @@ -22,6 +22,9 @@ import software.amazon.awssdk.services.sns.model.MessageAttributeValue; /** + * + * Util class to convert header map to SNS request. + * * @author Agim Emruli * @author Alain Sahli * @author Gyozo Papp @@ -29,7 +32,7 @@ */ public class SnsHeaderConverterUtil { private static final Log logger = LogFactory.getLog(SnsHeaderConverterUtil.class); - public static JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create(); + private static final JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create(); public static Map toSnsMessageAttributes(Message message) { HashMap messageAttributes = new HashMap<>(); diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java index e661092d6b..8da5cb560c 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java @@ -7,6 +7,7 @@ import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; import software.amazon.awssdk.services.sns.model.MessageAttributeValue; import software.amazon.awssdk.services.sns.model.PublishRequest; @@ -39,6 +40,8 @@ public DefaultSnsPublishMessageConverter(@Nullable MessageConverter messageConve @Override public PublishRequestMessagePair convert(Message originalMessage) { + Assert.notNull(originalMessage, "message cannot be null"); + PublishRequest.Builder publishRequest = PublishRequest.builder(); populateHeaders(publishRequest, originalMessage); @@ -53,6 +56,9 @@ public PublishRequestMessagePair convert(Message originalMessage) { @Override public PublishRequestMessagePair convert(T payload, Map headers) { + Assert.notNull(payload, "payload cannot be null"); + Assert.notNull(headers, "headers cannot be null"); + Message originalMessage = MessageBuilder .withPayload(payload) .copyHeaders(headers) diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java index 6678193cbc..f68a456e01 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java @@ -51,6 +51,9 @@ public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient, TopicArnResolver topicArn @Override public CompletableFuture> send(String destination, Message message) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(message, "message cannot be null"); + Arn topicArn = topicArnResolver.resolveTopicArn(destination); PublishRequestMessagePair pair = snsPublishMessageConverter.convert(message); @@ -64,21 +67,33 @@ public CompletableFuture> send(String destination, Message m @Override public CompletableFuture> convertAndSend(String destination, T payload) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + return convertAndSend(destination, payload, null, null); } @Override public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + return convertAndSend(destination, payload, headers, null); } @Override public CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + return convertAndSend(destination, payload, null, postProcessor); } @Override public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, @Nullable MessagePostProcessor postProcessor) { + Assert.notNull(destination, "destination cannot be null"); + Assert.notNull(payload, "payload cannot be null"); + Message message = MessageBuilder .withPayload(payload) .copyHeaders(headers != null ? headers : Collections.emptyMap()) @@ -93,17 +108,24 @@ public CompletableFuture> convertAndSend(String destination, T @Override public CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject) { + Assert.notNull(destinationName, "destinationName cannot be null"); + Assert.notNull(message, "message cannot be null"); + return convertAndSend(destinationName, message, Collections.singletonMap(NOTIFICATION_SUBJECT_HEADER, subject)); } @Override public CompletableFuture> sendNotification(String topic, SnsNotification notification) { + Assert.notNull(topic, "topic cannot be null"); + Assert.notNull(notification, "notification cannot be null"); + return convertAndSend(topic, notification.getPayload(), notification.getHeaders()); } @Override public CompletableFuture topicExists(String topicArn) { Assert.notNull(topicArn, "topicArn must not be null"); + return snsAsyncClient.getTopicAttributes(request -> request.topicArn(topicArn)) .thenApply(response -> true) .exceptionally(throwable -> { diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java index 0eaa9137b7..43dd10aca8 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java @@ -21,70 +21,54 @@ import static org.assertj.core.api.Assertions.assertThat; import io.awspring.cloud.sns.Person; -import java.util.HashMap; import java.util.Map; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.messaging.Message; import org.springframework.messaging.converter.JacksonJsonMessageConverter; import org.springframework.messaging.support.MessageBuilder; -import software.amazon.awssdk.services.sns.model.MessageAttributeValue; +import tools.jackson.databind.json.JsonMapper; /** - * Unit tests for {@link DefaultSnsPublishMessageConverter}. + * Tests for {@link DefaultSnsPublishMessageConverter}. * * @author Matej Nedic * @since 4.0.1 */ class DefaultSnsPublishMessageConverterTest { - private DefaultSnsPublishMessageConverter converter; + private final DefaultSnsPublishMessageConverter converter = createConverter(); - @BeforeEach - void setUp() { - JacksonJsonMessageConverter messageConverter = new JacksonJsonMessageConverter(); - messageConverter.setSerializedPayloadClass(String.class); - converter = new DefaultSnsPublishMessageConverter(messageConverter); + private static DefaultSnsPublishMessageConverter createConverter() { + JacksonJsonMessageConverter jackson = new JacksonJsonMessageConverter(new JsonMapper()); + jackson.setSerializedPayloadClass(String.class); + return new DefaultSnsPublishMessageConverter(jackson); } @Test - void convert_withMessage_returnsPublishRequestAndOriginalMessage() { - Message message = MessageBuilder - .withPayload("test payload") - .setHeader("custom-header", "custom-value") - .build(); + void convertsStringPayload() { + Message message = MessageBuilder.withPayload("hello").build(); PublishRequestMessagePair result = converter.convert(message); - assertThat(result.publishRequest()).isNotNull(); - assertThat(result.publishRequest().message()).isEqualTo("test payload"); + assertThat(result.publishRequest().message()).isEqualTo("hello"); assertThat(result.originalMessage()).isEqualTo(message); - assertThat(result.originalMessage().getPayload()).isEqualTo("test payload"); } @Test - void convert_withPayloadAndHeaders_buildsMessageAndReturnsIt() { - Map headers = new HashMap<>(); - headers.put("custom-header", "custom-value"); - headers.put(NOTIFICATION_SUBJECT_HEADER, "Test Subject"); - - PublishRequestMessagePair result = converter.convert("test payload", headers); - - assertThat(result.publishRequest()).isNotNull(); - assertThat(result.publishRequest().message()).isEqualTo("test payload"); - assertThat(result.publishRequest().subject()).isEqualTo("Test Subject"); - assertThat(result.originalMessage()).isNotNull(); - assertThat(result.originalMessage().getPayload()).isEqualTo("test payload"); - assertThat(result.originalMessage().getHeaders().get("custom-header")).isEqualTo("custom-value"); - assertThat(result.originalMessage().getHeaders().get(NOTIFICATION_SUBJECT_HEADER)).isEqualTo("Test Subject"); + void convertsPayloadWithHeaders() { + PublishRequestMessagePair result = converter.convert("payload", + Map.of("custom", "value", NOTIFICATION_SUBJECT_HEADER, "Subject")); + + assertThat(result.publishRequest().message()).isEqualTo("payload"); + assertThat(result.publishRequest().subject()).isEqualTo("Subject"); + assertThat(result.originalMessage().getPayload()).isEqualTo("payload"); + assertThat(result.originalMessage().getHeaders()).containsEntry("custom", "value"); } @Test - void convert_withSubjectHeader_setsSubjectInPublishRequest() { - Message message = MessageBuilder - .withPayload("test payload") - .setHeader(NOTIFICATION_SUBJECT_HEADER, "My Subject") - .build(); + void setsSubjectFromHeader() { + Message message = MessageBuilder.withPayload("test") + .setHeader(NOTIFICATION_SUBJECT_HEADER, "My Subject").build(); PublishRequestMessagePair result = converter.convert(message); @@ -92,55 +76,45 @@ void convert_withSubjectHeader_setsSubjectInPublishRequest() { } @Test - void convert_withFifoHeaders_setsFifoAttributesInPublishRequest() { - Message message = MessageBuilder - .withPayload("test payload") - .setHeader(MESSAGE_GROUP_ID_HEADER, "group-123") - .setHeader(MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-456") - .build(); + void setsFifoHeaders() { + Message message = MessageBuilder.withPayload("fifo") + .setHeader(MESSAGE_GROUP_ID_HEADER, "grp-1") + .setHeader(MESSAGE_DEDUPLICATION_ID_HEADER, "ded-1").build(); PublishRequestMessagePair result = converter.convert(message); - assertThat(result.publishRequest().messageGroupId()).isEqualTo("group-123"); - assertThat(result.publishRequest().messageDeduplicationId()).isEqualTo("dedup-456"); + assertThat(result.publishRequest().messageGroupId()).isEqualTo("grp-1"); + assertThat(result.publishRequest().messageDeduplicationId()).isEqualTo("ded-1"); } @Test - void convert_withMessageAttributes_setsMessageAttributesInPublishRequest() { - Message message = MessageBuilder - .withPayload("test payload") - .setHeader("string-attr", "value") - .setHeader("number-attr", 42) - .build(); + void convertsCustomHeadersToMessageAttributes() { + Message message = MessageBuilder.withPayload("test") + .setHeader("priority", "high") + .setHeader("count", 42).build(); PublishRequestMessagePair result = converter.convert(message); - assertThat(result.publishRequest().messageAttributes()).isNotEmpty(); - assertThat(result.publishRequest().messageAttributes()).containsKey("string-attr"); - assertThat(result.publishRequest().messageAttributes()).containsKey("number-attr"); + assertThat(result.publishRequest().messageAttributes()).containsKey("priority"); + assertThat(result.publishRequest().messageAttributes()).containsKey("count"); } @Test - void convert_withComplexObject_serializesPayload() { - Person testObject = new Person("test"); - Message message = MessageBuilder - .withPayload(testObject) - .build(); + void serializesComplexPayload() { + Person person = new Person("John"); + Message message = MessageBuilder.withPayload(person).build(); PublishRequestMessagePair result = converter.convert(message); - assertThat(result.publishRequest().message()).contains("test"); - assertThat(result.originalMessage().getPayload()).isEqualTo(testObject); + assertThat(result.publishRequest().message()).contains("John"); + assertThat(result.originalMessage().getPayload()).isEqualTo(person); } @Test - void convert_withEmptyHeaders_createsValidPublishRequest() { + void handlesEmptyHeaders() { PublishRequestMessagePair result = converter.convert("payload", Map.of()); - assertThat(result.publishRequest()).isNotNull(); assertThat(result.publishRequest().message()).isEqualTo("payload"); - assertThat(result.originalMessage()).isNotNull(); assertThat(result.originalMessage().getPayload()).isEqualTo("payload"); } - } diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java index 6ca18d071b..eaed577d96 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java @@ -23,9 +23,10 @@ import io.awspring.cloud.sns.Person; import io.awspring.cloud.sns.core.SnsNotification; -import java.util.HashMap; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import net.bytebuddy.utility.RandomString; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -44,12 +45,12 @@ import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import tools.jackson.databind.JsonNode; import tools.jackson.databind.json.JsonMapper; /** - * Integration tests for {@link SnsAsyncTemplate}. + * Integration tests for {@link SnsAsyncTemplate} using SNS → SQS subscriptions. * * @author Matej Nedic * @since 4.0.1 @@ -57,247 +58,201 @@ @Testcontainers class SnsAsyncTemplateIntegrationTest { - private static final String TOPIC_NAME = "async_topic_name"; - private static SnsAsyncTemplate snsAsyncTemplate; - private static SnsAsyncClient snsAsyncClient; - private static SqsClient sqsClient; - - private final JsonMapper jsonMapper = new JsonMapper(); + private static final JsonMapper jsonMapper = JsonMapper.builder().build(); @Container static LocalStackContainer localstack = new LocalStackContainer( DockerImageName.parse("localstack/localstack:4.4.0")); + private static SnsAsyncClient snsAsyncClient; + private static SqsClient sqsClient; + private static SnsAsyncTemplate snsAsyncTemplate; + private static String standardQueueUrl; + private static String standardTopicArn; + private static String fifoQueueUrl; + private static String fifoTopicArn; + @BeforeAll - public static void createSnsAsyncTemplate() { + static void setUp() throws Exception { + StaticCredentialsProvider credentials = StaticCredentialsProvider + .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); + snsAsyncClient = SnsAsyncClient.builder() .endpointOverride(localstack.getEndpoint()) .region(Region.of(localstack.getRegion())) - .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) - .build(); - + .credentialsProvider(credentials).build(); + sqsClient = SqsClient.builder() .endpointOverride(localstack.getEndpoint()) .region(Region.of(localstack.getRegion())) - .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) - .build(); - - JacksonJsonMessageConverter jacksonJsonConverter = new JacksonJsonMessageConverter(); - jacksonJsonConverter.setSerializedPayloadClass(String.class); - - DefaultSnsPublishMessageConverter converter = new DefaultSnsPublishMessageConverter(jacksonJsonConverter); - snsAsyncTemplate = new SnsAsyncTemplate(snsAsyncClient, converter); + .credentialsProvider(credentials).build(); + + standardQueueUrl = sqsClient.createQueue(r -> r.queueName("async-standard-queue")).queueUrl(); + String standardQueueArn = sqsClient + .getQueueAttributes(r -> r.queueUrl(standardQueueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .attributes().get(QueueAttributeName.QUEUE_ARN); + standardTopicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name("async-standard-topic").build()) + .get().topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(standardTopicArn).protocol("sqs").endpoint(standardQueueArn)).get(); + + + fifoQueueUrl = sqsClient + .createQueue(r -> r.queueName("async-fifo-queue.fifo") + .attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true"))) + .queueUrl(); + String fifoQueueArn = sqsClient + .getQueueAttributes(r -> r.queueUrl(fifoQueueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .attributes().get(QueueAttributeName.QUEUE_ARN); + fifoTopicArn = snsAsyncClient + .createTopic(CreateTopicRequest.builder().name("async-fifo-topic.fifo") + .attributes(Map.of("FifoTopic", "true", "ContentBasedDeduplication", "true")).build()) + .get().topicArn(); + snsAsyncClient.subscribe(r -> r.topicArn(fifoTopicArn).protocol("sqs").endpoint(fifoQueueArn)).get(); + + JacksonJsonMessageConverter jackson = new JacksonJsonMessageConverter(); + jackson.setSerializedPayloadClass(String.class); + snsAsyncTemplate = new SnsAsyncTemplate(snsAsyncClient, + new DefaultSnsPublishMessageConverter(jackson)); } - @Nested - class FifoTopics { - private static String queueUrl; - private static String queueArn; - - @BeforeAll - public static void init() { - queueUrl = sqsClient - .createQueue(r -> r.queueName("async-queue.fifo") - .attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true"))) - .queueUrl(); - queueArn = sqsClient - .getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) - .attributes().get(QueueAttributeName.QUEUE_ARN); - } - - @Test - void convertAndSend_validTextMessage_usesFifoTopic_sendsToSqs() throws Exception { - String topicName = "async_fifo_topic.fifo"; - Map topicAttributes = new HashMap<>(); - topicAttributes.put("FifoTopic", String.valueOf(true)); - String topicArn = snsAsyncClient - .createTopic(CreateTopicRequest.builder().name(topicName).attributes(topicAttributes).build()) - .get() - .topicArn(); - snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); - - Map headers = new HashMap<>(); - headers.put(MESSAGE_GROUP_ID_HEADER, "group-id"); - headers.put(MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-id"); - headers.put("custom-fifo-header", "fifo-value"); - CompletableFuture> result = snsAsyncTemplate.convertAndSend( - topicName, - "async message", - headers - ); - - SnsResult snsResult = result.get(); - assertThat(snsResult.messageId()).isNotNull(); - assertThat(snsResult.message().getPayload()).isEqualTo("async message"); - assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", MESSAGE_GROUP_ID_HEADER, MESSAGE_DEDUPLICATION_ID_HEADER, "custom-fifo-header"); - assertThat(snsResult.message().getHeaders().get(MESSAGE_GROUP_ID_HEADER)).isEqualTo("group-id"); - assertThat(snsResult.message().getHeaders().get(MESSAGE_DEDUPLICATION_ID_HEADER)).isEqualTo("dedup-id"); - - await().untilAsserted(() -> { - ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); - assertThat(response.hasMessages()).isTrue(); - JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); - assertThat(body.get("Message").asString()).isEqualTo("async message"); - }); - } - - @AfterEach - public void purgeQueue() { - sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); - } + private static List receiveAll(String queueUrl) { + List messages = new ArrayList<>(); + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(500)).until(() -> { + var response = sqsClient.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl) + .maxNumberOfMessages(10).waitTimeSeconds(2).messageAttributeNames("All").build()); + if (response.hasMessages() && !response.messages().isEmpty()) { + messages.addAll(response.messages()); + } + return !messages.isEmpty(); + }); + return messages; } @Nested - class NonFifoTopics { - private static String queueUrl; - private static String queueArn; - - @BeforeAll - public static void init() { - queueUrl = sqsClient.createQueue(r -> r.queueName("async-standard-queue")).queueUrl(); - queueArn = sqsClient - .getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) - .attributes().get(QueueAttributeName.QUEUE_ARN); + class StandardTopicTests { + + @AfterEach + void purgeQueue() { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(standardQueueUrl).build()); } @Test - void convertAndSend_validTextMessage_sendsToStandardTopic() throws Exception { - String topicArn = snsAsyncClient - .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME).build()) - .get() - .topicArn(); - snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); - - Map headers = new HashMap<>(); - headers.put("custom-header", "custom-value"); - headers.put("number-header", 42); - CompletableFuture> result = snsAsyncTemplate.convertAndSend(topicArn, "async message", headers); - - SnsResult snsResult = result.get(); - assertThat(snsResult.messageId()).isNotNull(); - assertThat(snsResult.sequenceNumber()).isNull(); - assertThat(snsResult.message().getPayload()).isEqualTo("async message"); - assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", "custom-header", "number-header"); - assertThat(snsResult.message().getHeaders().get("custom-header")).isEqualTo("custom-value"); - assertThat(snsResult.message().getHeaders().get("number-header")).isEqualTo(42); - - await().untilAsserted(() -> { - ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); - assertThat(response.hasMessages()).isTrue(); - JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); - assertThat(body.get("Message").asString()).isEqualTo("async message"); - }); + void convertAndSendWithHeaders() throws Exception { + SnsResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, "async message", + Map.of("custom-header", "custom-value")).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.sequenceNumber()).isNull(); + assertThat(result.message().getPayload()).isEqualTo("async message"); + assertThat(result.message().getHeaders()).containsEntry("custom-header", "custom-value"); + + var received = receiveAll(standardQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + assertThat(snsEnvelope.get("Message").asString()).isEqualTo("async message"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("custom-header").get("Value").asString()).isEqualTo("custom-value"); } @Test - void convertAndSend_validPersonObject_sendsToStandardTopic() throws Exception { - String topicArn = snsAsyncClient - .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME + "_person").build()) - .get() - .topicArn(); - snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); - - Person person = new Person("John Doe"); - Map headers = new HashMap<>(); - headers.put("person-type", "employee"); - headers.put(NOTIFICATION_SUBJECT_HEADER, "Person Update"); - CompletableFuture> result = snsAsyncTemplate.convertAndSend(topicArn, person, headers); - - SnsResult snsResult = result.get(); - assertThat(snsResult.messageId()).isNotNull(); - assertThat(snsResult.message().getPayload()).isEqualTo(person); - assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", "person-type", NOTIFICATION_SUBJECT_HEADER); - assertThat(snsResult.message().getHeaders().get("person-type")).isEqualTo("employee"); - - await().untilAsserted(() -> { - ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); - assertThat(response.hasMessages()).isTrue(); - Person receivedPerson = jsonMapper.readValue( - jsonMapper.readTree(response.messages().get(0).body()).get("Message").asString(), - Person.class - ); - assertThat(receivedPerson.getName()).isEqualTo("John Doe"); - }); + void convertAndSendWithPersonPayload() throws Exception { + Person person = new Person("John"); + SnsResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, person, + Map.of("person-type", "employee", NOTIFICATION_SUBJECT_HEADER, "Person Update")).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.message().getPayload()).isEqualTo(person); + + var received = receiveAll(standardQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + Person receivedPerson = jsonMapper.readValue(snsEnvelope.get("Message").asString(), Person.class); + assertThat(receivedPerson.getName()).isEqualTo("John"); + assertThat(snsEnvelope.get("Subject").asString()).isEqualTo("Person Update"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("person-type").get("Value").asString()).isEqualTo("employee"); } @Test - void sendNotification_withSubject_sendsMessageWithSubject() throws Exception { - String topicArn = snsAsyncClient - .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME + "_subject").build()) - .get() - .topicArn(); - snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); - - CompletableFuture> result = snsAsyncTemplate.sendNotification( - topicArn, - "message with subject", - "Test Subject" - ); - - SnsResult snsResult = result.get(); - assertThat(snsResult.messageId()).isNotNull(); - - await().untilAsserted(() -> { - ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); - assertThat(response.hasMessages()).isTrue(); - JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); - assertThat(body.get("Message").asString()).isEqualTo("message with subject"); - assertThat(body.get("Subject").asString()).isEqualTo("Test Subject"); - }); + void sendNotificationWithSubject() throws Exception { + SnsResult result = snsAsyncTemplate.sendNotification(standardTopicArn, + "message with subject", "Test Subject").get(); + + assertThat(result.messageId()).isNotNull(); + + var received = receiveAll(standardQueueUrl); + JsonNode body = jsonMapper.readTree(received.get(0).body()); + assertThat(body.get("Message").asString()).isEqualTo("message with subject"); + assertThat(body.get("Subject").asString()).isEqualTo("Test Subject"); } @Test - void sendNotification_withSnsNotification_sendsMessageWithHeaders() throws Exception { - String topicArn = snsAsyncClient - .createTopic(CreateTopicRequest.builder().name(TOPIC_NAME + "_notification").build()) - .get() - .topicArn(); - snsAsyncClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)).get(); - + void sendNotificationWithSnsNotification() throws Exception { SnsNotification notification = SnsNotification.builder("notification payload") .header(NOTIFICATION_SUBJECT_HEADER, "Notification Subject") - .header("notification-type", "alert") - .header("priority", 1) - .build(); - - CompletableFuture> result = snsAsyncTemplate.sendNotification(topicArn, notification); - - SnsResult snsResult = result.get(); - assertThat(snsResult.messageId()).isNotNull(); - assertThat(snsResult.message().getPayload()).isEqualTo("notification payload"); - assertThat(snsResult.message().getHeaders()).containsKeys("id", "timestamp", NOTIFICATION_SUBJECT_HEADER, "notification-type", "priority"); - - await().untilAsserted(() -> { - ReceiveMessageResponse response = sqsClient.receiveMessage(r -> r.queueUrl(queueUrl)); - assertThat(response.hasMessages()).isTrue(); - JsonNode body = jsonMapper.readTree(response.messages().get(0).body()); - assertThat(body.get("Message").asString()).isEqualTo("notification payload"); - assertThat(body.get("Subject").asString()).isEqualTo("Notification Subject"); - }); - } + .header("notification-type", "alert").build(); - @AfterEach - public void purgeQueue() { - sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); + SnsResult result = snsAsyncTemplate.sendNotification(standardTopicArn, notification).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.message().getPayload()).isEqualTo("notification payload"); + + var received = receiveAll(standardQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + assertThat(snsEnvelope.get("Message").asString()).isEqualTo("notification payload"); + assertThat(snsEnvelope.get("Subject").asString()).isEqualTo("Notification Subject"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("notification-type").get("Value").asString()).isEqualTo("alert"); } } - @Test - void topicExists_shouldReturnFalseForNonExistingTopic() throws Exception { - String nonExistentTopicArn = "arn:aws:sns:us-east-1:000000000000:nope"; + @Nested + class FifoTopicTests { - CompletableFuture result = snsAsyncTemplate.topicExists(nonExistentTopicArn); + @AfterEach + void purgeQueue() { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(fifoQueueUrl).build()); + } - assertThat(result.get()).isFalse(); + @Test + void convertAndSendWithFifoHeaders() throws Exception { + SnsResult result = snsAsyncTemplate.convertAndSend(fifoTopicArn, "fifo message", + Map.of(MESSAGE_GROUP_ID_HEADER, "group-1", + MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-1", + "custom-fifo-header", "fifo-value")).get(); + + assertThat(result.messageId()).isNotNull(); + assertThat(result.message().getPayload()).isEqualTo("fifo message"); + assertThat(result.message().getHeaders()) + .containsEntry(MESSAGE_GROUP_ID_HEADER, "group-1") + .containsEntry(MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-1"); + + var received = receiveAll(fifoQueueUrl); + JsonNode snsEnvelope = jsonMapper.readTree(received.get(0).body()); + assertThat(snsEnvelope.get("Message").asString()).isEqualTo("fifo message"); + + JsonNode messageAttributes = snsEnvelope.get("MessageAttributes"); + assertThat(messageAttributes).isNotNull(); + assertThat(messageAttributes.get("custom-fifo-header").get("Value").asString()).isEqualTo("fifo-value"); + } } @Test - void topicExists_shouldReturnTrueForExistingTopic() throws Exception { - String topicName = RandomString.make(); - var topicArn = snsAsyncClient.createTopic(request -> request.name(topicName)).get().topicArn(); + void topicExistsReturnsTrueForExistingTopic() throws Exception { + String topicArn = snsAsyncClient + .createTopic(r -> r.name(RandomString.make())).get().topicArn(); - CompletableFuture result = snsAsyncTemplate.topicExists(topicArn); + assertThat(snsAsyncTemplate.topicExists(topicArn).get()).isTrue(); + } - assertThat(result.get()).isTrue(); + @Test + void topicExistsReturnsFalseForNonExistingTopic() throws Exception { + assertThat(snsAsyncTemplate.topicExists( + "arn:aws:sns:us-east-1:000000000000:nope").get()).isFalse(); } } From 06e9abccc9c7e3a7c9d0b5989df4e57a07873dbe Mon Sep 17 00:00:00 2001 From: matejnedic Date: Mon, 9 Feb 2026 20:02:22 +0100 Subject: [PATCH 05/10] rename --- .../cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java index eaed577d96..1232437b73 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java @@ -50,7 +50,7 @@ import tools.jackson.databind.json.JsonMapper; /** - * Integration tests for {@link SnsAsyncTemplate} using SNS → SQS subscriptions. + * Integration tests for {@link SnsAsyncTemplate} * * @author Matej Nedic * @since 4.0.1 From 1b5811912fd1c6bbbb15f08e58dd771033c2b90a Mon Sep 17 00:00:00 2001 From: matejnedic Date: Mon, 9 Feb 2026 20:07:57 +0100 Subject: [PATCH 06/10] Add java docs --- .../awspring/cloud/sns/core/async/SnsAsyncOperations.java | 6 ++---- .../awspring/cloud/sns/core/async/SnsAsyncTemplate.java | 8 ++++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java index 0bb0e53cea..45416564a1 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java @@ -45,8 +45,7 @@ public interface SnsAsyncOperations { * @param the payload type * @return a CompletableFuture with the result */ - CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers) - ; + CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers); /** * Converts and sends a payload with a post processor to a destination. @@ -57,8 +56,7 @@ CompletableFuture> convertAndSend(String destination, T payload * @param the payload type * @return a CompletableFuture with the result */ - CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor) - ; + CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor); /** * Converts and sends a payload with headers and a post processor to a destination. diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java index f68a456e01..2890c63a41 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java @@ -49,6 +49,14 @@ public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient, TopicArnResolver topicArn this.snsPublishMessageConverter = messageConverter == null ? new DefaultSnsPublishMessageConverter() : messageConverter; } + /** + * Sends a message to a destination. + * + * @param destination the destination name + * @param message the message to send + * @param the message payload type + * @return a CompletableFuture with the result + */ @Override public CompletableFuture> send(String destination, Message message) { Assert.notNull(destination, "destination cannot be null"); From 1334179a3b5ad7ce5331b6f8a515adaf519f4237 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Tue, 10 Feb 2026 22:04:47 +0100 Subject: [PATCH 07/10] Since 4.1.0 --- .../cloud/sns/core/async/DefaultSnsPublishMessageConverter.java | 2 +- .../cloud/sns/core/async/PublishRequestMessagePair.java | 2 +- .../io/awspring/cloud/sns/core/async/SnsAsyncOperations.java | 2 +- .../java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java | 2 +- .../cloud/sns/core/async/SnsPublishMessageConverter.java | 2 +- .../main/java/io/awspring/cloud/sns/core/async/SnsResult.java | 2 +- .../sns/core/async/DefaultSnsPublishMessageConverterTest.java | 2 +- .../cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java index 8da5cb560c..b36eec6eeb 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverter.java @@ -24,7 +24,7 @@ * Default implementation of {@link SnsPublishMessageConverter}. * * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ public class DefaultSnsPublishMessageConverter implements SnsPublishMessageConverter { diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java index a73a2d6759..0b18c62dfe 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java @@ -10,7 +10,7 @@ * @param originalMessage the original Spring message * @param the message payload type * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ public record PublishRequestMessagePair( PublishRequest publishRequest, diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java index 45416564a1..5175b182e3 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java @@ -12,7 +12,7 @@ * High level asynchronous SNS operations. * * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ public interface SnsAsyncOperations { diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java index 2890c63a41..8a26f40924 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java @@ -24,7 +24,7 @@ * Asynchronous template for SNS operations. * * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ public class SnsAsyncTemplate implements SnsAsyncOperations { diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java index e60dec68c4..c4423dcfb5 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsPublishMessageConverter.java @@ -8,7 +8,7 @@ * Converter for transforming Spring messages to SNS PublishRequest. * * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ public interface SnsPublishMessageConverter { diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java index 7d7236c985..326f774c27 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java @@ -11,7 +11,7 @@ * @param sequenceNumber the sequence number for FIFO topics (null for standard topics) * @param the message payload type * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ public record SnsResult( Message message, diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java index 43dd10aca8..ce2a9283d0 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/DefaultSnsPublishMessageConverterTest.java @@ -32,7 +32,7 @@ * Tests for {@link DefaultSnsPublishMessageConverter}. * * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ class DefaultSnsPublishMessageConverterTest { diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java index 1232437b73..2e058b44e6 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java @@ -53,7 +53,7 @@ * Integration tests for {@link SnsAsyncTemplate} * * @author Matej Nedic - * @since 4.0.1 + * @since 4.1.0 */ @Testcontainers class SnsAsyncTemplateIntegrationTest { From d2e9eb7a5643e2ea8767e9f1773a5f0e6fc98645 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 12 Feb 2026 20:57:24 +0100 Subject: [PATCH 08/10] Adjust comments --- docs/src/main/asciidoc/sns.adoc | 4 ++-- .../cloud/sns/core/SnsHeaderConverterUtil.java | 2 +- ...nsResult.java => PublishMessageResult.java} | 2 +- .../core/async/PublishRequestMessagePair.java | 2 +- .../sns/core/async/SnsAsyncOperations.java | 16 ++++++++-------- .../cloud/sns/core/async/SnsAsyncTemplate.java | 18 +++++++++--------- .../async/SnsAsyncTemplateIntegrationTest.java | 10 +++++----- 7 files changed, 27 insertions(+), 27 deletions(-) rename spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/{SnsResult.java => PublishMessageResult.java} (93%) diff --git a/docs/src/main/asciidoc/sns.adoc b/docs/src/main/asciidoc/sns.adoc index 53e67510c8..c6fef321d9 100644 --- a/docs/src/main/asciidoc/sns.adoc +++ b/docs/src/main/asciidoc/sns.adoc @@ -157,7 +157,7 @@ It supports the same payload types as `SnsTemplate` and uses the same Spring `Me [source,java] ---- import io.awspring.cloud.sns.core.async.SnsAsyncTemplate; -import io.awspring.cloud.sns.core.async.SnsResult; +import io.awspring.cloud.sns.core.async.PublishMessageResult; import java.util.concurrent.CompletableFuture; @@ -168,7 +168,7 @@ class NotificationService { this.snsAsyncTemplate = snsAsyncTemplate; } - CompletableFuture> sendNotification() { + CompletableFuture> sendNotification() { return snsAsyncTemplate.convertAndSend("topic-arn", "payload"); } } diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java index 4eb67786fb..2e8fb2c12c 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaderConverterUtil.java @@ -30,7 +30,7 @@ * @author Gyozo Papp * @author Matej Nedic */ -public class SnsHeaderConverterUtil { +class SnsHeaderConverterUtil { private static final Log logger = LogFactory.getLog(SnsHeaderConverterUtil.class); private static final JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create(); diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishMessageResult.java similarity index 93% rename from spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java rename to spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishMessageResult.java index 326f774c27..f48e836449 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsResult.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishMessageResult.java @@ -13,7 +13,7 @@ * @author Matej Nedic * @since 4.1.0 */ -public record SnsResult( +public record PublishMessageResult( Message message, String messageId, @Nullable String sequenceNumber diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java index 0b18c62dfe..7eb24ea61a 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/PublishRequestMessagePair.java @@ -12,7 +12,7 @@ * @author Matej Nedic * @since 4.1.0 */ -public record PublishRequestMessagePair( +record PublishRequestMessagePair( PublishRequest publishRequest, Message originalMessage ) { diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java index 5175b182e3..9945a284a1 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncOperations.java @@ -24,7 +24,7 @@ public interface SnsAsyncOperations { * @param the message payload type * @return a CompletableFuture with the result */ - CompletableFuture> send(String destination, Message message) ; + CompletableFuture> send(String destination, Message message) ; /** * Converts and sends a payload to a destination. @@ -34,7 +34,7 @@ public interface SnsAsyncOperations { * @param the payload type * @return a CompletableFuture with the result */ - CompletableFuture> convertAndSend(String destination, T payload) ; + CompletableFuture> convertAndSend(String destination, T payload) ; /** * Converts and sends a payload with headers to a destination. @@ -45,7 +45,7 @@ public interface SnsAsyncOperations { * @param the payload type * @return a CompletableFuture with the result */ - CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers); + CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers); /** * Converts and sends a payload with a post processor to a destination. @@ -56,7 +56,7 @@ public interface SnsAsyncOperations { * @param the payload type * @return a CompletableFuture with the result */ - CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor); + CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor); /** * Converts and sends a payload with headers and a post processor to a destination. @@ -68,8 +68,8 @@ public interface SnsAsyncOperations { * @param the payload type * @return a CompletableFuture with the result */ - CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, - @Nullable MessagePostProcessor postProcessor); + CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, + @Nullable MessagePostProcessor postProcessor); /** * Sends a notification with a message and subject to a destination. @@ -79,7 +79,7 @@ CompletableFuture> convertAndSend(String destination, T payload * @param subject the subject (can be null) * @return a CompletableFuture with the result */ - CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject); + CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject); /** * Sends a notification to a topic. @@ -89,7 +89,7 @@ CompletableFuture> convertAndSend(String destination, T payload * @param the notification payload type * @return a CompletableFuture with the result */ - CompletableFuture> sendNotification(String topic, SnsNotification notification); + CompletableFuture> sendNotification(String topic, SnsNotification notification); /** * Checks if a topic with the given ARN exists. diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java index 8a26f40924..a0e984217c 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplate.java @@ -58,7 +58,7 @@ public SnsAsyncTemplate(SnsAsyncClient snsAsyncClient, TopicArnResolver topicArn * @return a CompletableFuture with the result */ @Override - public CompletableFuture> send(String destination, Message message) { + public CompletableFuture> send(String destination, Message message) { Assert.notNull(destination, "destination cannot be null"); Assert.notNull(message, "message cannot be null"); @@ -74,7 +74,7 @@ public CompletableFuture> send(String destination, Message m } @Override - public CompletableFuture> convertAndSend(String destination, T payload) { + public CompletableFuture> convertAndSend(String destination, T payload) { Assert.notNull(destination, "destination cannot be null"); Assert.notNull(payload, "payload cannot be null"); @@ -82,7 +82,7 @@ public CompletableFuture> convertAndSend(String destination, T } @Override - public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers) { + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers) { Assert.notNull(destination, "destination cannot be null"); Assert.notNull(payload, "payload cannot be null"); @@ -90,7 +90,7 @@ public CompletableFuture> convertAndSend(String destination, T } @Override - public CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor) { + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable MessagePostProcessor postProcessor) { Assert.notNull(destination, "destination cannot be null"); Assert.notNull(payload, "payload cannot be null"); @@ -98,7 +98,7 @@ public CompletableFuture> convertAndSend(String destination, T } @Override - public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, @Nullable MessagePostProcessor postProcessor) { + public CompletableFuture> convertAndSend(String destination, T payload, @Nullable Map headers, @Nullable MessagePostProcessor postProcessor) { Assert.notNull(destination, "destination cannot be null"); Assert.notNull(payload, "payload cannot be null"); @@ -115,7 +115,7 @@ public CompletableFuture> convertAndSend(String destination, T } @Override - public CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject) { + public CompletableFuture> sendNotification(String destinationName, Object message, @Nullable String subject) { Assert.notNull(destinationName, "destinationName cannot be null"); Assert.notNull(message, "message cannot be null"); @@ -123,7 +123,7 @@ public CompletableFuture> sendNotification(String destinationN } @Override - public CompletableFuture> sendNotification(String topic, SnsNotification notification) { + public CompletableFuture> sendNotification(String topic, SnsNotification notification) { Assert.notNull(topic, "topic cannot be null"); Assert.notNull(notification, "notification cannot be null"); @@ -147,9 +147,9 @@ public CompletableFuture topicExists(String topicArn) { }); } - private CompletableFuture> publish(PublishRequest request, Message originalMessage) { + private CompletableFuture> publish(PublishRequest request, Message originalMessage) { return snsAsyncClient.publish(request) - .thenApply(response -> new SnsResult<>( + .thenApply(response -> new PublishMessageResult<>( originalMessage, response.messageId(), response.sequenceNumber() diff --git a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java index 2e058b44e6..4860f5ae4f 100644 --- a/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java +++ b/spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/core/async/SnsAsyncTemplateIntegrationTest.java @@ -139,7 +139,7 @@ void purgeQueue() { @Test void convertAndSendWithHeaders() throws Exception { - SnsResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, "async message", + PublishMessageResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, "async message", Map.of("custom-header", "custom-value")).get(); assertThat(result.messageId()).isNotNull(); @@ -159,7 +159,7 @@ void convertAndSendWithHeaders() throws Exception { @Test void convertAndSendWithPersonPayload() throws Exception { Person person = new Person("John"); - SnsResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, person, + PublishMessageResult result = snsAsyncTemplate.convertAndSend(standardTopicArn, person, Map.of("person-type", "employee", NOTIFICATION_SUBJECT_HEADER, "Person Update")).get(); assertThat(result.messageId()).isNotNull(); @@ -178,7 +178,7 @@ void convertAndSendWithPersonPayload() throws Exception { @Test void sendNotificationWithSubject() throws Exception { - SnsResult result = snsAsyncTemplate.sendNotification(standardTopicArn, + PublishMessageResult result = snsAsyncTemplate.sendNotification(standardTopicArn, "message with subject", "Test Subject").get(); assertThat(result.messageId()).isNotNull(); @@ -195,7 +195,7 @@ void sendNotificationWithSnsNotification() throws Exception { .header(NOTIFICATION_SUBJECT_HEADER, "Notification Subject") .header("notification-type", "alert").build(); - SnsResult result = snsAsyncTemplate.sendNotification(standardTopicArn, notification).get(); + PublishMessageResult result = snsAsyncTemplate.sendNotification(standardTopicArn, notification).get(); assertThat(result.messageId()).isNotNull(); assertThat(result.message().getPayload()).isEqualTo("notification payload"); @@ -221,7 +221,7 @@ void purgeQueue() { @Test void convertAndSendWithFifoHeaders() throws Exception { - SnsResult result = snsAsyncTemplate.convertAndSend(fifoTopicArn, "fifo message", + PublishMessageResult result = snsAsyncTemplate.convertAndSend(fifoTopicArn, "fifo message", Map.of(MESSAGE_GROUP_ID_HEADER, "group-1", MESSAGE_DEDUPLICATION_ID_HEADER, "dedup-1", "custom-fifo-header", "fifo-value")).get(); From 0dd43fe632a46dc52202ee5c62ca3f0923a8278f Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 12 Feb 2026 21:03:43 +0100 Subject: [PATCH 09/10] Adjust --- .../awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java index a8adbbe91e..6401407d4b 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sns/SnsAutoConfiguration.java @@ -103,6 +103,7 @@ public SnsSmsTemplate snsSmsTemplate(SnsClient snsClient) { return new SnsSmsTemplate(snsClient); } + @ConditionalOnClass({SnsAsyncClient.class, SnsAsyncTemplate.class}) @Configuration static class SnsAsyncTemplateConfiguration { From 76982b8d1940011958bd5a83bb7e2b3da1c7f320 Mon Sep 17 00:00:00 2001 From: matejnedic Date: Thu, 12 Feb 2026 21:05:38 +0100 Subject: [PATCH 10/10] Adjust --- .../java/io/awspring/cloud/sns/core/TopicMessageChannel.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java index 0e3000556d..e471acc2f3 100644 --- a/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java +++ b/spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/TopicMessageChannel.java @@ -40,8 +40,6 @@ */ public class TopicMessageChannel extends AbstractMessageChannel { - public JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create(); - private final SnsClient snsClient; private final Arn topicArn;