Skip to content

Commit e81a8a8

Browse files
committed
[FLINK-38606][Connectors/MongoDB] Support for Flink 2.1.0 API
1 parent 34f14a6 commit e81a8a8

File tree

10 files changed

+20
-25
lines changed

10 files changed

+20
-25
lines changed

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.ExecutionConfig;
2222
import org.apache.flink.api.connector.sink2.Sink;
2323
import org.apache.flink.api.connector.sink2.SinkWriter;
24+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2425
import org.apache.flink.api.java.ClosureCleaner;
2526
import org.apache.flink.connector.base.DeliveryGuarantee;
2627
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
@@ -78,7 +79,7 @@ public static <IN> MongoSinkBuilder<IN> builder() {
7879
}
7980

8081
@Override
81-
public SinkWriter<IN> createWriter(InitContext context) {
82+
public SinkWriter<IN> createWriter(WriterInitContext context) {
8283
return new MongoWriter<>(
8384
connectionOptions,
8485
writeOptions,

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.flink.api.common.functions.util.ListCollector;
2323
import org.apache.flink.api.common.operators.MailboxExecutor;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
25-
import org.apache.flink.api.connector.sink2.Sink;
2625
import org.apache.flink.api.connector.sink2.SinkWriter;
26+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2727
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
2828
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
2929
import org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
@@ -89,7 +89,7 @@ public MongoWriter(
8989
MongoConnectionOptions connectionOptions,
9090
MongoWriteOptions writeOptions,
9191
boolean flushOnCheckpoint,
92-
Sink.InitContext initContext,
92+
WriterInitContext initContext,
9393
MongoSerializationSchema<IN> serializationSchema) {
9494
this.connectionOptions = checkNotNull(connectionOptions);
9595
this.writeOptions = checkNotNull(writeOptions);

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,23 @@
1818
package org.apache.flink.connector.mongodb.sink.writer.context;
1919

2020
import org.apache.flink.annotation.Internal;
21-
import org.apache.flink.api.connector.sink2.Sink;
21+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2222
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
2323

2424
/** Default {@link MongoSinkContext} implementation. */
2525
@Internal
2626
public class DefaultMongoSinkContext implements MongoSinkContext {
2727

28-
private final Sink.InitContext initContext;
28+
private final WriterInitContext initContext;
2929
private final MongoWriteOptions writeOptions;
3030

31-
public DefaultMongoSinkContext(Sink.InitContext initContext, MongoWriteOptions writeOptions) {
31+
public DefaultMongoSinkContext(WriterInitContext initContext, MongoWriteOptions writeOptions) {
3232
this.initContext = initContext;
3333
this.writeOptions = writeOptions;
3434
}
3535

3636
@Override
37-
public Sink.InitContext getInitContext() {
37+
public WriterInitContext getInitContext() {
3838
return initContext;
3939
}
4040

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.flink.connector.mongodb.sink.writer.context;
1919

2020
import org.apache.flink.annotation.PublicEvolving;
21-
import org.apache.flink.api.connector.sink2.Sink;
21+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2222
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
2323
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
2424

@@ -27,7 +27,7 @@
2727
public interface MongoSinkContext {
2828

2929
/** Returns the current sink's init context. */
30-
Sink.InitContext getInitContext();
30+
WriterInitContext getInitContext();
3131

3232
/** Returns the current process time in flink. */
3333
long processTime();

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
import org.apache.flink.api.connector.source.SplitEnumerator;
2727
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2828
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
29-
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
3029
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
31-
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3230
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
3331
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
3432
import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
@@ -134,8 +132,6 @@ public Boundedness getBoundedness() {
134132

135133
@Override
136134
public SourceReader<OUT, MongoSourceSplit> createReader(SourceReaderContext readerContext) {
137-
FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>> elementsQueue =
138-
new FutureCompletingBlockingQueue<>();
139135

140136
MongoSourceReaderContext mongoReaderContext =
141137
new MongoSourceReaderContext(readerContext, limit);
@@ -150,7 +146,6 @@ public SourceReader<OUT, MongoSourceSplit> createReader(SourceReaderContext read
150146
mongoReaderContext);
151147

152148
return new MongoSourceReader<>(
153-
elementsQueue,
154149
splitReaderSupplier,
155150
new MongoRecordEmitter<>(deserializationSchema),
156151
mongoReaderContext);

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.connector.base.source.reader.RecordEmitter;
22-
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
2322
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
2423
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
2524
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
26-
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
2725
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
2826
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplitState;
2927
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
@@ -49,13 +47,11 @@ public class MongoSourceReader<OUT>
4947
private static final Logger LOG = LoggerFactory.getLogger(MongoSourceReader.class);
5048

5149
public MongoSourceReader(
52-
FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>> elementQueue,
5350
Supplier<SplitReader<BsonDocument, MongoSourceSplit>> splitReaderSupplier,
5451
RecordEmitter<BsonDocument, OUT, MongoSourceSplitState> recordEmitter,
5552
MongoSourceReaderContext readerContext) {
5653
super(
57-
elementQueue,
58-
new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier),
54+
new SingleThreadFetcherManager<>(splitReaderSupplier, readerContext.getConfiguration()),
5955
recordEmitter,
6056
readerContext.getConfiguration(),
6157
readerContext);

flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.flink.connector.mongodb.sink;
1919

2020
import org.apache.flink.api.common.functions.MapFunction;
21-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2221
import org.apache.flink.api.common.state.CheckpointListener;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.RestartStrategyOptions;
2324
import org.apache.flink.connector.base.DeliveryGuarantee;
2425
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
2526
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
@@ -101,9 +102,10 @@ void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)
101102
throws Exception {
102103
final String collection = "test-sink-with-delivery-" + deliveryGuarantee;
103104
final MongoSink<Document> sink = createSink(collection, deliveryGuarantee);
104-
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
105+
Configuration config = new Configuration();
106+
config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
107+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
105108
env.enableCheckpointing(100L);
106-
env.setRestartStrategy(RestartStrategies.noRestart());
107109

108110
env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
109111
env.execute();

flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ void testSinkContext() throws Exception {
243243

244244
MongoSerializationSchema<Document> testSerializationSchema =
245245
(element, context) -> {
246-
assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0);
246+
assertThat(context.getInitContext().getTaskInfo().getIndexOfThisSubtask())
247+
.isEqualTo(0);
247248
assertThat(context.getWriteOptions()).isEqualTo(expectOptions);
248249
assertThat(context.processTime())
249250
.isEqualTo(

flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ private List<ResolvedExpression> resolveSQLFilterToExpression(
286286
RexNodeExpression rexExp =
287287
(RexNodeExpression) tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);
288288

289-
RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, -1, rexExp.getRexNode());
289+
RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, rexExp.getRexNode());
290290
// converts the cnf condition to a list of AND conditions
291291
List<RexNode> conjunctions = RelOptUtil.conjunctions(cnf);
292292

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ under the License.
5757
<mongodb7.version>7.0.12</mongodb7.version>
5858
<mongodb.version>${mongodb4.version}</mongodb.version>
5959

60-
<flink.version>1.20.0</flink.version>
60+
<flink.version>2.1.0</flink.version>
6161
<scala.binary.version>2.12</scala.binary.version>
6262
<scala-library.version>2.12.7</scala-library.version>
6363
<junit5.version>5.8.1</junit5.version>

0 commit comments

Comments
 (0)