Skip to content

Commit dfd9142

Browse files
fnu, rohanartembilan
authored andcommitted
INT-4566: R2DBC Outbound Channel Adapter
JIRA: https://jira.spring.io/browse/INT-4566 * Fixed review comments * added `DELETE` and `Criteria` implementation for outbound channel adapter * Clean up code style * Add initial docs
1 parent b860a2e commit dfd9142

File tree

8 files changed

+725
-0
lines changed

8 files changed

+725
-0
lines changed

build.gradle

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ ext {
105105
springWsVersion = '3.0.9.RELEASE'
106106
tomcatVersion = "9.0.36"
107107
xstreamVersion = '1.4.12'
108+
r2dbch2Version='0.8.4.RELEASE'
108109

109110
javaProjects = subprojects - project(':spring-integration-bom')
110111
}
@@ -637,6 +638,17 @@ project('spring-integration-mongodb') {
637638
}
638639
}
639640

641+
project('spring-integration-r2dbc') {
642+
description = 'Spring Integration R2DBC Support'
643+
dependencies {
644+
api project(':spring-integration-core')
645+
api ('org.springframework.data:spring-data-r2dbc') {
646+
exclude group: 'org.springframework'
647+
}
648+
testImplementation "io.r2dbc:r2dbc-h2:$r2dbch2Version"
649+
}
650+
}
651+
640652
project('spring-integration-mqtt') {
641653
description = 'Spring Integration MQTT Support'
642654
dependencies {
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.r2dbc.outbound;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.springframework.data.r2dbc.core.DatabaseClient;
23+
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
24+
import org.springframework.data.relational.core.query.Criteria;
25+
import org.springframework.data.relational.core.query.Update;
26+
import org.springframework.data.relational.core.sql.SqlIdentifier;
27+
import org.springframework.expression.Expression;
28+
import org.springframework.expression.TypeLocator;
29+
import org.springframework.expression.common.LiteralExpression;
30+
import org.springframework.expression.spel.support.StandardEvaluationContext;
31+
import org.springframework.expression.spel.support.StandardTypeLocator;
32+
import org.springframework.integration.expression.ExpressionUtils;
33+
import org.springframework.integration.expression.ValueExpression;
34+
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
35+
import org.springframework.lang.Nullable;
36+
import org.springframework.messaging.Message;
37+
import org.springframework.util.Assert;
38+
39+
import reactor.core.publisher.Mono;
40+
41+
42+
/**
43+
* Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes
44+
* Message payload into a Relational Database, using reactive r2dbc support.
45+
*
46+
* @author Rohan Mukesh
47+
* @author Artem Bilan
48+
*
49+
* @since 5.4
50+
*/
51+
public class R2dbcMessageHandler extends AbstractReactiveMessageHandler {
52+
53+
private final R2dbcEntityOperations r2dbcEntityOperations;
54+
55+
private StandardEvaluationContext evaluationContext;
56+
57+
private Expression queryTypeExpression = new ValueExpression<>(Type.INSERT);
58+
59+
@Nullable
60+
private Expression tableNameExpression;
61+
62+
@Nullable
63+
private Expression valuesExpression;
64+
65+
@Nullable
66+
private Expression criteriaExpression;
67+
68+
private volatile boolean initialized = false;
69+
70+
/**
71+
* Construct this instance using a fully created and initialized instance of provided
72+
* {@link R2dbcEntityOperations}
73+
* @param r2dbcEntityOperations The R2dbcEntityOperations implementation.
74+
*/
75+
public R2dbcMessageHandler(R2dbcEntityOperations r2dbcEntityOperations) {
76+
Assert.notNull(r2dbcEntityOperations, "'r2dbcEntityOperations' must not be null");
77+
this.r2dbcEntityOperations = r2dbcEntityOperations;
78+
}
79+
80+
81+
public void setQueryType(R2dbcMessageHandler.Type type) {
82+
setQueryTypeExpression(new ValueExpression<>(type));
83+
}
84+
85+
public void setQueryTypeExpression(Expression queryTypeExpression) {
86+
Assert.notNull(queryTypeExpression, "'queryTypeExpression' must not be null");
87+
this.queryTypeExpression = queryTypeExpression;
88+
}
89+
90+
public void setTableName(String tableName) {
91+
setTableNameExpression(new LiteralExpression(tableName));
92+
}
93+
94+
public void setTableNameExpression(Expression tableNameExpression) {
95+
this.tableNameExpression = tableNameExpression;
96+
}
97+
98+
public void setValuesExpression(Expression valuesExpression) {
99+
this.valuesExpression = valuesExpression;
100+
}
101+
102+
public void setCriteriaExpression(Expression criteriaExpression) {
103+
this.criteriaExpression = criteriaExpression;
104+
}
105+
106+
107+
@Override
108+
public String getComponentType() {
109+
return "r2dbc:reactive-outbound-channel-adapter";
110+
}
111+
112+
@Override
113+
protected void onInit() {
114+
super.onInit();
115+
116+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
117+
TypeLocator typeLocator = this.evaluationContext.getTypeLocator();
118+
if (typeLocator instanceof StandardTypeLocator) {
119+
//Register R2dbc criteria API package so FQCN can be avoided in query-expression.
120+
((StandardTypeLocator) typeLocator).registerImport("org.springframework.data.relational.core.query");
121+
}
122+
this.initialized = true;
123+
124+
}
125+
126+
@Override
127+
protected Mono<Void> handleMessageInternal(Message<?> message) {
128+
Assert.isTrue(this.initialized, "The instance is not yet initialized. Invoke its afterPropertiesSet() method");
129+
return Mono.fromSupplier(() -> this.queryTypeExpression.getValue(this.evaluationContext, message, Type.class))
130+
.flatMap(mode -> {
131+
switch (mode) {
132+
case INSERT:
133+
return handleInsert(message);
134+
case UPDATE:
135+
return handleUpdate(message);
136+
case DELETE:
137+
return handleDelete(message);
138+
default:
139+
return Mono.error(new IllegalArgumentException());
140+
}
141+
}).then();
142+
}
143+
144+
145+
private Mono<Void> handleDelete(Message<?> message) {
146+
if (this.tableNameExpression != null) {
147+
String tableName = evaluateTableNameExpression(message);
148+
Criteria criteria = evaluateCriteriaExpression(message);
149+
DatabaseClient.DeleteMatchingSpec deleteSpec =
150+
this.r2dbcEntityOperations.getDatabaseClient()
151+
.delete()
152+
.from(tableName);
153+
return deleteSpec.matching(criteria)
154+
.then();
155+
}
156+
else {
157+
return this.r2dbcEntityOperations.delete(message.getPayload())
158+
.then();
159+
}
160+
}
161+
162+
private Mono<Void> handleUpdate(Message<?> message) {
163+
if (this.tableNameExpression != null) {
164+
String tableName = evaluateTableNameExpression(message);
165+
Map<String, Object> values = evaluateValuesExpression(message);
166+
Map<SqlIdentifier, Object> updateMap = transformIntoSqlIdentifierMap(values);
167+
Criteria criteria = evaluateCriteriaExpression(message);
168+
DatabaseClient.GenericUpdateSpec updateSpec =
169+
this.r2dbcEntityOperations.getDatabaseClient().update()
170+
.table(tableName);
171+
return updateSpec.using(Update.from(updateMap))
172+
.matching(criteria)
173+
.then();
174+
}
175+
else {
176+
return this.r2dbcEntityOperations.update(message.getPayload())
177+
.then();
178+
}
179+
}
180+
181+
private Map<SqlIdentifier, Object> transformIntoSqlIdentifierMap(Map<String, Object> values) {
182+
Map<SqlIdentifier, Object> sqlIdentifierObjectMap = new HashMap<>();
183+
values.forEach((k, v) -> sqlIdentifierObjectMap.put(SqlIdentifier.unquoted(k), v));
184+
return sqlIdentifierObjectMap;
185+
}
186+
187+
private Mono<Void> handleInsert(Message<?> message) {
188+
if (this.tableNameExpression != null) {
189+
String tableName = evaluateTableNameExpression(message);
190+
Map<String, Object> values = evaluateValuesExpression(message);
191+
DatabaseClient.GenericInsertSpec<Map<String, Object>> insertSpec =
192+
this.r2dbcEntityOperations.getDatabaseClient()
193+
.insert()
194+
.into(tableName);
195+
for (Map.Entry<String, Object> entry : values.entrySet()) {
196+
insertSpec = insertSpec.value(entry.getKey(), entry.getValue());
197+
}
198+
return insertSpec.then();
199+
}
200+
else {
201+
return this.r2dbcEntityOperations.insert(message.getPayload())
202+
.then();
203+
}
204+
}
205+
206+
private String evaluateTableNameExpression(Message<?> message) {
207+
String tableName = this.tableNameExpression.getValue(this.evaluationContext, message, String.class);
208+
Assert.notNull(tableName, "'tableNameExpression' must not evaluate to null");
209+
return tableName;
210+
}
211+
212+
@SuppressWarnings("unchecked")
213+
private Map<String, Object> evaluateValuesExpression(Message<?> message) {
214+
Map<String, Object> fieldValues =
215+
(Map<String, Object>) this.valuesExpression.getValue(this.evaluationContext, message, Map.class);
216+
Assert.notNull(fieldValues, "'valuesExpression' must not evaluate to null");
217+
return fieldValues;
218+
}
219+
220+
private Criteria evaluateCriteriaExpression(Message<?> message) {
221+
Criteria criteria =
222+
this.criteriaExpression.getValue(this.evaluationContext, message, Criteria.class);
223+
Assert.notNull(criteria, "'criteriaExpression' must not evaluate to null");
224+
return criteria;
225+
}
226+
227+
228+
/**
229+
* /**
230+
* The mode for the {@link R2dbcMessageHandler}.
231+
*/
232+
public enum Type {
233+
234+
/**
235+
* Set a {@link R2dbcMessageHandler} into an {@code insert} mode.
236+
*/
237+
INSERT,
238+
239+
/**
240+
* Set a {@link R2dbcMessageHandler} into an {@code update} mode.
241+
*/
242+
UPDATE,
243+
244+
/**
245+
* Set a {@link R2dbcMessageHandler} into a {@code delete} mode.
246+
*/
247+
DELETE,
248+
249+
}
250+
251+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.r2dbc.outbound;
18+
19+
import org.springframework.data.annotation.Id;
20+
import org.springframework.data.relational.core.mapping.Table;
21+
22+
/**
23+
* @author Rohan Mukesh
24+
*
25+
* @since 5.4
26+
*/
27+
@Table
28+
class Person {
29+
30+
@Id
31+
Integer id;
32+
33+
String name;
34+
35+
Integer age;
36+
37+
public void setId(Integer id) {
38+
this.id = id;
39+
}
40+
41+
public void setName(String name) {
42+
this.name = name;
43+
}
44+
45+
public void setAge(Integer age) {
46+
this.age = age;
47+
}
48+
49+
Person(String name, Integer age) {
50+
this.name = name;
51+
this.age = age;
52+
}
53+
54+
public Integer getId() {
55+
return this.id;
56+
}
57+
58+
public String getName() {
59+
return this.name;
60+
}
61+
62+
public Integer getAge() {
63+
return this.age;
64+
}
65+
66+
}

0 commit comments

Comments
 (0)