Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/src/main/asciidoc/sns.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,45 @@ class NotificationService {
}
----

==== SNS Async Template

The starter automatically configures and registers a `SnsAsyncTemplate` bean when `SnsAsyncClient` bean is registered by user, providing non-blocking SNS operations.
`SnsAsyncTemplate` returns `CompletableFuture` for all operations, making it suitable for high-throughput async applications.

It supports the same payload types as `SnsTemplate` and uses the same Spring `MessageConverter` hirarchy inside of `DefaultSnsPublishMessageConverter` to convert payloads.

[source,java]
----
import io.awspring.cloud.sns.core.async.SnsAsyncTemplate;
import io.awspring.cloud.sns.core.async.PublishMessageResult;

import java.util.concurrent.CompletableFuture;

class NotificationService {
private final SnsAsyncTemplate snsAsyncTemplate;

NotificationService(SnsAsyncTemplate snsAsyncTemplate) {
this.snsAsyncTemplate = snsAsyncTemplate;
}

CompletableFuture<PublishMessageResult<String>> sendNotification() {
return snsAsyncTemplate.convertAndSend("topic-arn", "payload");
}
}
----

To customize message conversion, provide a custom `SnsPublishMessageConverter` bean:

[source,java]
----
@Bean
public SnsPublishMessageConverter customConverter() {
return new MyCustomConverter();
}
----

Autoconfiguration will pick it up and configure `SnsAsyncTemplate` automatically with it.

=== Using SNS Client

To have access to all lower level SNS operations, we recommend using `SnsClient` from AWS SDK. `SnsClient` bean is autoconfigured by `SnsAutoConfiguration`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import io.awspring.cloud.sns.core.SnsOperations;
import io.awspring.cloud.sns.core.SnsTemplate;
import io.awspring.cloud.sns.core.TopicArnResolver;
import io.awspring.cloud.sns.core.async.DefaultSnsPublishMessageConverter;
import io.awspring.cloud.sns.core.async.SnsAsyncOperations;
import io.awspring.cloud.sns.core.async.SnsAsyncTemplate;
import io.awspring.cloud.sns.core.async.SnsPublishMessageConverter;
import io.awspring.cloud.sns.core.batch.SnsBatchOperations;
import io.awspring.cloud.sns.core.batch.SnsBatchTemplate;
import io.awspring.cloud.sns.core.batch.converter.DefaultSnsMessageConverter;
Expand All @@ -38,12 +42,15 @@
import io.awspring.cloud.sns.core.batch.executor.SequentialBatchExecutionStrategy;
import io.awspring.cloud.sns.sms.SnsSmsOperations;
import io.awspring.cloud.sns.sms.SnsSmsTemplate;

import java.util.List;
import java.util.Optional;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
Expand All @@ -56,6 +63,7 @@
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.SnsClient;
import tools.jackson.databind.json.JsonMapper;

Expand All @@ -71,22 +79,22 @@
* @author Mariusz Sondecki
*/
@AutoConfiguration
@ConditionalOnClass({ SnsClient.class, SnsTemplate.class })
@EnableConfigurationProperties({ SnsProperties.class })
@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
@ConditionalOnClass({SnsClient.class, SnsTemplate.class})
@EnableConfigurationProperties({SnsProperties.class})
@AutoConfigureAfter({CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class})
@ConditionalOnProperty(name = "spring.cloud.aws.sns.enabled", havingValue = "true", matchIfMissing = true)
public class SnsAutoConfiguration {

@ConditionalOnMissingBean
@Bean
public SnsClient snsClient(SnsProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer,
ObjectProvider<AwsConnectionDetails> connectionDetails,
ObjectProvider<SnsClientCustomizer> snsClientCustomizers,
ObjectProvider<AwsSyncClientCustomizer> awsSyncClientCustomizers) {
ObjectProvider<AwsConnectionDetails> connectionDetails,
ObjectProvider<SnsClientCustomizer> snsClientCustomizers,
ObjectProvider<AwsSyncClientCustomizer> awsSyncClientCustomizers) {
return awsClientBuilderConfigurer
.configureSyncClient(SnsClient.builder(), properties, connectionDetails.getIfAvailable(),
snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream())
.build();
.configureSyncClient(SnsClient.builder(), properties, connectionDetails.getIfAvailable(),
snsClientCustomizers.orderedStream(), awsSyncClientCustomizers.orderedStream())
.build();
}

@ConditionalOnMissingBean(SnsSmsOperations.class)
Expand All @@ -95,18 +103,41 @@ public SnsSmsTemplate snsSmsTemplate(SnsClient snsClient) {
return new SnsSmsTemplate(snsClient);
}

@ConditionalOnClass({SnsAsyncClient.class, SnsAsyncTemplate.class})
@Configuration
static class SnsAsyncTemplateConfiguration {

@Bean
@ConditionalOnMissingBean(SnsAsyncOperations.class)
@ConditionalOnBean(SnsAsyncClient.class)
public SnsAsyncTemplate snsAsyncTemplate(SnsAsyncClient snsAsyncClient, Optional<TopicArnResolver> topicArnResolver, SnsPublishMessageConverter snsPublishMessageConverter) {
return topicArnResolver.map(it -> new SnsAsyncTemplate(snsAsyncClient, it, snsPublishMessageConverter))
.orElseGet(() -> new SnsAsyncTemplate(snsAsyncClient, snsPublishMessageConverter));
}

@Bean
@ConditionalOnMissingBean(SnsPublishMessageConverter.class)
@ConditionalOnBean(SnsAsyncClient.class)
public SnsPublishMessageConverter snsPublishMessageConverter(Optional<JsonMapper> jsonMapper) {
JacksonJsonMessageConverter converter = new JacksonJsonMessageConverter(
jsonMapper.orElseGet(JsonMapper::new));
converter.setSerializedPayloadClass(String.class);
return new DefaultSnsPublishMessageConverter(converter);
}
}

@ConditionalOnClass(name = "tools.jackson.databind.json.JsonMapper")
@Configuration
static class SnsConfiguration {
@ConditionalOnMissingBean(SnsOperations.class)
@Bean
public SnsTemplate snsTemplate(SnsClient snsClient, Optional<JsonMapper> jsonMapper,
Optional<TopicArnResolver> topicArnResolver, ObjectProvider<ChannelInterceptor> interceptors) {
Optional<TopicArnResolver> topicArnResolver, ObjectProvider<ChannelInterceptor> interceptors) {
JacksonJsonMessageConverter converter = new JacksonJsonMessageConverter(
jsonMapper.orElseGet(JsonMapper::new));
jsonMapper.orElseGet(JsonMapper::new));
converter.setSerializedPayloadClass(String.class);
SnsTemplate snsTemplate = topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter))
.orElseGet(() -> new SnsTemplate(snsClient, converter));
.orElseGet(() -> new SnsTemplate(snsClient, converter));
interceptors.forEach(snsTemplate::addChannelInterceptor);

return snsTemplate;
Expand All @@ -120,12 +151,12 @@ static class LegacyJackson2Configuration {
@ConditionalOnMissingBean(SnsOperations.class)
@Bean
public SnsTemplate snsTemplate(SnsClient snsClient, Optional<ObjectMapper> objectMapper,
Optional<TopicArnResolver> topicArnResolver, ObjectProvider<ChannelInterceptor> interceptors) {
Optional<TopicArnResolver> topicArnResolver, ObjectProvider<ChannelInterceptor> interceptors) {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setSerializedPayloadClass(String.class);
objectMapper.ifPresent(converter::setObjectMapper);
SnsTemplate snsTemplate = topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter))
.orElseGet(() -> new SnsTemplate(snsClient, converter));
.orElseGet(() -> new SnsTemplate(snsClient, converter));
interceptors.forEach(snsTemplate::addChannelInterceptor);

return snsTemplate;
Expand Down Expand Up @@ -173,8 +204,7 @@ public void addArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers)
resolvers.add(getNotificationHandlerMethodArgumentResolver(snsClient));
}
};
}
else if (JacksonPresent.isJackson2Present()) {
} else if (JacksonPresent.isJackson2Present()) {
return new WebMvcConfigurer() {
@Override
public void addArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers) {
Expand All @@ -183,7 +213,7 @@ public void addArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers)
};
}
throw new IllegalStateException(
"SecretsManagerPropertySource requires a Jackson 2 or Jackson 3 library on the classpath");
"SnsWebMvc integration requires a Jackson 2 or Jackson 3 library on the classpath");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import io.awspring.cloud.sns.core.batch.SnsBatchTemplate;
import io.awspring.cloud.sns.core.batch.converter.SnsMessageConverter;
import io.awspring.cloud.sns.core.batch.executor.BatchExecutionStrategy;
import io.awspring.cloud.sns.core.async.SnsAsyncTemplate;
import io.awspring.cloud.sns.sms.SnsSmsOperations;
import io.awspring.cloud.sns.sms.SnsSmsTemplate;

import java.net.URI;


import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.AutoConfigurations;
Expand All @@ -43,6 +46,7 @@
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.SnsClient;

/**
Expand Down Expand Up @@ -131,6 +135,37 @@ void customChannelInterceptorCanBeConfigured() {
.run(context -> assertThat(context).hasSingleBean(CustomChannelInterceptor.class));
}

@Nested
class SnsAsyncTemplateTests {

@Test
void snsAsyncTemplateIsNotCreatedWhenSnsAsyncClientIsNotPresent() {
contextRunner.run(context -> {
assertThat(context).hasSingleBean(SnsClient.class);
assertThat(context).hasSingleBean(SnsTemplate.class);
assertThat(context).doesNotHaveBean(SnsAsyncTemplate.class);
});
}

@Test
void snsAsyncTemplateIsCreatedWhenSnsAsyncClientIsPresent() {
contextRunner.withUserConfiguration(SnsAsyncClientConfiguration.class).run(context -> {
assertThat(context).hasSingleBean(SnsClient.class);
assertThat(context).hasSingleBean(SnsTemplate.class);
assertThat(context).hasSingleBean(SnsAsyncClient.class);
assertThat(context).hasSingleBean(SnsAsyncTemplate.class);
});
}

@Test
void bothAsyncTemplatesAndOperationsAreInjectable() {
contextRunner.withUserConfiguration(SnsAsyncClientConfiguration.class, InjectingAsyncTemplatesConfiguration.class).run(context -> {
assertThat(context.isRunning()).isTrue();
assertThat(context).hasSingleBean(SnsAsyncTemplate.class);
});
}
}


@Configuration(proxyBeanMethods = false)
static class CustomTopicArnResolverConfiguration {
Expand Down Expand Up @@ -198,4 +233,22 @@ ChannelInterceptor customChannelInterceptor() {

static class CustomChannelInterceptor implements ChannelInterceptor {
}

@Configuration(proxyBeanMethods = false)
static class SnsAsyncClientConfiguration {

@Bean
SnsAsyncClient snsAsyncClient() {
return mock(SnsAsyncClient.class);
}
}

@Configuration(proxyBeanMethods = false)
static class InjectingAsyncTemplatesConfiguration {
@Bean
ApplicationRunner asyncRunner1(SnsAsyncTemplate snsAsyncTemplate) {
return args -> {
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
*/
public class TopicMessageChannel extends AbstractMessageChannel {

public JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create();

private final SnsClient snsClient;

private final Arn topicArn;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.awspring.cloud.sns.core.async;

import io.awspring.cloud.sns.core.SnsHeaderConverterUtil;
import org.jspecify.annotations.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
import software.amazon.awssdk.services.sns.model.PublishRequest;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER;
import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER;
import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER;

/**
* Default implementation of {@link SnsPublishMessageConverter}.
*
* @author Matej Nedic
* @since 4.1.0
*/
public class DefaultSnsPublishMessageConverter implements SnsPublishMessageConverter {

private final MessageConverter messageConverter;

public DefaultSnsPublishMessageConverter() {
this(null);
}

public DefaultSnsPublishMessageConverter(@Nullable MessageConverter messageConverter) {
this.messageConverter = initMessageConverter(messageConverter);
}

@Override
public <T> PublishRequestMessagePair<T> convert(Message<T> originalMessage) {
Assert.notNull(originalMessage, "message cannot be null");

PublishRequest.Builder publishRequest = PublishRequest.builder();
populateHeaders(publishRequest, originalMessage);

Message<?> convertedMessage = messageConverter.toMessage(
originalMessage.getPayload(),
originalMessage.getHeaders()
);
publishRequest.message(convertedMessage.getPayload().toString());

return new PublishRequestMessagePair<>(publishRequest.build(), originalMessage);
}

@Override
public <T> PublishRequestMessagePair<T> convert(T payload, Map<String, Object> headers) {
Assert.notNull(payload, "payload cannot be null");
Assert.notNull(headers, "headers cannot be null");

Message<T> originalMessage = MessageBuilder
.withPayload(payload)
.copyHeaders(headers)
.build();

return convert(originalMessage);
}

private <T> void populateHeaders(PublishRequest.Builder publishRequest, Message<T> message) {
Map<String, MessageAttributeValue> messageAttributes = SnsHeaderConverterUtil.toSnsMessageAttributes(message);
if (!messageAttributes.isEmpty()) {
publishRequest.messageAttributes(messageAttributes);
}

Optional.ofNullable(message.getHeaders().get(NOTIFICATION_SUBJECT_HEADER, String.class))
.ifPresent(publishRequest::subject);

Optional.ofNullable(message.getHeaders().get(MESSAGE_GROUP_ID_HEADER, String.class))
.ifPresent(publishRequest::messageGroupId);

Optional.ofNullable(message.getHeaders().get(MESSAGE_DEDUPLICATION_ID_HEADER, String.class))
.ifPresent(publishRequest::messageDeduplicationId);
}


private static CompositeMessageConverter initMessageConverter(@Nullable MessageConverter messageConverter) {
List<MessageConverter> converters = new ArrayList<>();

StringMessageConverter stringMessageConverter = new StringMessageConverter();
stringMessageConverter.setSerializedPayloadClass(String.class);
converters.add(stringMessageConverter);

if (messageConverter != null) {
converters.add(messageConverter);
}

return new CompositeMessageConverter(converters);
}
}
Loading