Skip to content

Commit afa79d8

Browse files
akuma8artembilan
authored andcommitted
GH-3226: Redis Stream Outbound Channel Adapter
Fixes #3226 * Redis stream message handler support. * This is the outbound part publishing message to the actual stream using ReactiveStreamOperations * Addition of more test cases with one using `MessageChannel`. * Improvements after PR review. * Removed failed test reading List from a Stream * Code style clean up * Remove `rawtypes` usage * Remove redundant inner classes for test model * Add `What's New` note
1 parent 4761800 commit afa79d8

File tree

7 files changed

+449
-67
lines changed

7 files changed

+449
-67
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.redis.outbound;
18+
19+
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
20+
import org.springframework.data.redis.connection.stream.Record;
21+
import org.springframework.data.redis.connection.stream.StreamRecords;
22+
import org.springframework.data.redis.core.ReactiveRedisTemplate;
23+
import org.springframework.data.redis.core.ReactiveStreamOperations;
24+
import org.springframework.data.redis.hash.HashMapper;
25+
import org.springframework.data.redis.serializer.RedisSerializationContext;
26+
import org.springframework.expression.EvaluationContext;
27+
import org.springframework.expression.Expression;
28+
import org.springframework.expression.common.LiteralExpression;
29+
import org.springframework.integration.expression.ExpressionUtils;
30+
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
31+
import org.springframework.lang.Nullable;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.util.Assert;
34+
35+
import reactor.core.publisher.Mono;
36+
37+
/**
38+
* Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes
39+
* Message payload or Message itself (see {@link #extractPayload}) into a Redis stream using Reactive Stream operations.
40+
*
41+
* @author Attoumane Ahamadi
42+
* @author Artem Bilan
43+
*
44+
* @since 5.4
45+
*/
46+
public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHandler {
47+
48+
private final Expression streamKeyExpression;
49+
50+
private final ReactiveRedisConnectionFactory connectionFactory;
51+
52+
private EvaluationContext evaluationContext;
53+
54+
private boolean extractPayload = true;
55+
56+
private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
57+
58+
private RedisSerializationContext<String, ?> serializationContext = RedisSerializationContext.string();
59+
60+
@Nullable
61+
private HashMapper<String, ?, ?> hashMapper;
62+
63+
/**
64+
* Create an instance based on provided {@link ReactiveRedisConnectionFactory} and key for stream.
65+
* @param connectionFactory the {@link ReactiveRedisConnectionFactory} to use
66+
* @param streamKey the key for stream
67+
*/
68+
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) {
69+
this(connectionFactory, new LiteralExpression(streamKey));
70+
}
71+
72+
/**
73+
* Create an instance based on provided {@link ReactiveRedisConnectionFactory} and expression for stream key.
74+
* @param connectionFactory the {@link ReactiveRedisConnectionFactory} to use
75+
* @param streamKeyExpression the SpEL expression to evaluate a key for stream
76+
*/
77+
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory,
78+
Expression streamKeyExpression) {
79+
80+
Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null");
81+
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
82+
this.streamKeyExpression = streamKeyExpression;
83+
this.connectionFactory = connectionFactory;
84+
}
85+
86+
public void setSerializationContext(RedisSerializationContext<String, ?> serializationContext) {
87+
Assert.notNull(serializationContext, "'serializationContext' must not be null");
88+
this.serializationContext = serializationContext;
89+
}
90+
91+
/**
92+
* (Optional) Set the {@link HashMapper} used to create {@link #reactiveStreamOperations}.
93+
* The default {@link HashMapper} is defined from the provided {@link RedisSerializationContext}
94+
* @param hashMapper the wanted hashMapper
95+
* */
96+
public void setHashMapper(@Nullable HashMapper<String, ?, ?> hashMapper) {
97+
this.hashMapper = hashMapper;
98+
}
99+
100+
/**
101+
* Set to {@code true} to extract the payload; otherwise
102+
* the entire message is sent. Default {@code true}.
103+
* @param extractPayload false to not extract.
104+
*/
105+
public void setExtractPayload(boolean extractPayload) {
106+
this.extractPayload = extractPayload;
107+
}
108+
109+
@Override
110+
public String getComponentType() {
111+
return "redis:stream-outbound-channel-adapter";
112+
}
113+
114+
@Override
115+
@SuppressWarnings("unchecked")
116+
protected void onInit() {
117+
super.onInit();
118+
119+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
120+
121+
ReactiveRedisTemplate<String, ?> template =
122+
new ReactiveRedisTemplate<>(this.connectionFactory, this.serializationContext);
123+
this.reactiveStreamOperations =
124+
this.hashMapper == null
125+
? template.opsForStream()
126+
: template.opsForStream(
127+
(HashMapper<? super String, ? super Object, ? super Object>) this.hashMapper);
128+
}
129+
130+
@Override
131+
protected Mono<Void> handleMessageInternal(Message<?> message) {
132+
return Mono
133+
.fromSupplier(() -> {
134+
String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
135+
Assert.notNull(streamKey, "'streamKey' must not be null");
136+
return streamKey;
137+
})
138+
.flatMap((streamKey) -> {
139+
Object value = message;
140+
if (this.extractPayload) {
141+
value = message.getPayload();
142+
}
143+
144+
Record<String, ?> record =
145+
StreamRecords.objectBacked(value)
146+
.withStreamKey(streamKey);
147+
148+
return this.reactiveStreamOperations.add(record);
149+
})
150+
.then();
151+
}
152+
153+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.redis.outbound;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
25+
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.beans.factory.annotation.Qualifier;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
30+
import org.springframework.data.redis.connection.stream.ObjectRecord;
31+
import org.springframework.data.redis.connection.stream.StreamOffset;
32+
import org.springframework.data.redis.core.ReactiveRedisTemplate;
33+
import org.springframework.data.redis.serializer.RedisSerializationContext;
34+
import org.springframework.data.redis.serializer.StringRedisSerializer;
35+
import org.springframework.integration.channel.DirectChannel;
36+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
37+
import org.springframework.integration.redis.rules.RedisAvailable;
38+
import org.springframework.integration.redis.rules.RedisAvailableRule;
39+
import org.springframework.integration.redis.rules.RedisAvailableTests;
40+
import org.springframework.integration.redis.util.Address;
41+
import org.springframework.integration.redis.util.Person;
42+
import org.springframework.messaging.Message;
43+
import org.springframework.messaging.MessageChannel;
44+
import org.springframework.messaging.support.GenericMessage;
45+
import org.springframework.test.annotation.DirtiesContext;
46+
import org.springframework.test.context.junit4.SpringRunner;
47+
48+
/**
49+
* @author Attoumane Ahamadi
50+
* @author Artem Bilan
51+
*
52+
* @since 5.4
53+
*/
54+
@RunWith(SpringRunner.class)
55+
@DirtiesContext
56+
public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests {
57+
58+
private static final String STREAM_KEY = "myStream";
59+
60+
@Autowired
61+
@Qualifier("streamChannel")
62+
private MessageChannel messageChannel;
63+
64+
@Autowired
65+
private ReactiveRedisConnectionFactory redisConnectionFactory;
66+
67+
@Autowired
68+
private ReactiveMessageHandlerAdapter handlerAdapter;
69+
70+
@Autowired
71+
private ReactiveRedisStreamMessageHandler streamMessageHandler;
72+
73+
@Before
74+
public void deleteStreamKey() {
75+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate<>(this.redisConnectionFactory,
76+
RedisSerializationContext.string());
77+
template.delete(STREAM_KEY).block();
78+
}
79+
80+
81+
@Test
82+
@RedisAvailable
83+
public void integrationStreamOutboundTest() {
84+
String messagePayload = "Hello stream message";
85+
86+
messageChannel.send(new GenericMessage<>(messagePayload));
87+
88+
RedisSerializationContext<String, ?> serializationContext = redisSerializationContext();
89+
90+
ReactiveRedisTemplate<String, ?> template =
91+
new ReactiveRedisTemplate<>(redisConnectionFactory, serializationContext);
92+
93+
ObjectRecord<String, String> record =
94+
template.opsForStream()
95+
.read(String.class, StreamOffset.fromStart(STREAM_KEY))
96+
.blockFirst();
97+
98+
assertThat(record.getStream()).isEqualTo(STREAM_KEY);
99+
100+
assertThat(record.getValue()).isEqualTo(messagePayload);
101+
}
102+
103+
@Test
104+
@RedisAvailable
105+
public void explicitSerializationContextWithModelTest() {
106+
Address address = new Address("Rennes, France");
107+
Person person = new Person(address, "Attoumane");
108+
109+
Message<?> message = new GenericMessage<>(person);
110+
111+
RedisSerializationContext<String, ?> serializationContext = redisSerializationContext();
112+
113+
streamMessageHandler.setSerializationContext(serializationContext);
114+
streamMessageHandler.afterPropertiesSet();
115+
116+
handlerAdapter.handleMessage(message);
117+
118+
ReactiveRedisTemplate<String, ?> template =
119+
new ReactiveRedisTemplate<>(redisConnectionFactory, serializationContext);
120+
121+
ObjectRecord<String, Person> record =
122+
template.opsForStream()
123+
.read(Person.class, StreamOffset.fromStart(STREAM_KEY))
124+
.blockFirst();
125+
126+
assertThat(record.getStream()).isEqualTo(STREAM_KEY);
127+
assertThat(record.getValue().getName()).isEqualTo("Attoumane");
128+
assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France");
129+
}
130+
131+
132+
private RedisSerializationContext<String, ?> redisSerializationContext() {
133+
return RedisSerializationContext.fromSerializer(StringRedisSerializer.UTF_8);
134+
}
135+
136+
137+
@Configuration
138+
public static class ReactiveRedisStreamMessageHandlerTestsContext {
139+
140+
@Bean
141+
public MessageChannel streamChannel(ReactiveMessageHandlerAdapter messageHandlerAdapter) {
142+
DirectChannel directChannel = new DirectChannel();
143+
directChannel.subscribe(messageHandlerAdapter);
144+
directChannel.setMaxSubscribers(1);
145+
return directChannel;
146+
}
147+
148+
149+
@Bean
150+
public ReactiveRedisStreamMessageHandler streamMessageHandler(
151+
ReactiveRedisConnectionFactory connectionFactory) {
152+
153+
return new ReactiveRedisStreamMessageHandler(connectionFactory, STREAM_KEY);
154+
}
155+
156+
@Bean
157+
public ReactiveMessageHandlerAdapter reactiveMessageHandlerAdapter(
158+
ReactiveRedisStreamMessageHandler streamMessageHandler) {
159+
160+
return new ReactiveMessageHandlerAdapter(streamMessageHandler);
161+
}
162+
163+
@Bean
164+
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
165+
return RedisAvailableRule.connectionFactory;
166+
}
167+
168+
}
169+
170+
}

0 commit comments

Comments
 (0)