Skip to content

Commit 55008c5

Browse files
committed
[FLINK-37818] Add NoopCommitter for non-EOS
Add NoopCommitter to avoid EOS assumptions to leak into non-EOS sinks. (cherry picked from commit 0455935)
1 parent d54d6f5 commit 55008c5

File tree

8 files changed

+147
-42
lines changed

8 files changed

+147
-42
lines changed

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
3737
# If you want to investigate test failures, overwrite the level as above
3838
logger.container.name = container
3939
logger.container.level = OFF
40-
logger.container.additivity = false # This prevents messages from being logged by the root logger
40+
# This prevents messages from being logged by the root logger
41+
logger.container.additivity = false
4142
logger.container.appenderRef.containerappender.ref = ContainerLogger
4243

4344
logger.kafkacontainer.name = container.kafka
@@ -48,7 +49,8 @@ logger.flinkcontainer.level = OFF
4849

4950
logger.flinkenv.name = org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment
5051
logger.flinkenv.level = OFF
51-
logger.flinkenv.additivity = false # This prevents messages from being logged by the root logger
52+
# This prevents messages from being logged by the root logger
53+
logger.flinkenv.additivity = false
5254
logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger
5355

5456
appender.containerappender.name = ContainerLogger

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
3333
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
3434
import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
35+
import org.apache.flink.connector.kafka.sink.internal.NoopCommitter;
3536
import org.apache.flink.core.io.SimpleVersionedSerializer;
3637
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
3738
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -111,13 +112,16 @@ public static <IN> KafkaSinkBuilder<IN> builder() {
111112
@Internal
112113
@Override
113114
public Committer<KafkaCommittable> createCommitter(CommitterInitContext context) {
114-
return new KafkaCommitter(
115-
kafkaProducerConfig,
116-
transactionalIdPrefix,
117-
context.getTaskInfo().getIndexOfThisSubtask(),
118-
context.getTaskInfo().getAttemptNumber(),
119-
transactionNamingStrategy == TransactionNamingStrategy.POOLING,
120-
FlinkKafkaInternalProducer::new);
115+
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
116+
return new KafkaCommitter(
117+
kafkaProducerConfig,
118+
transactionalIdPrefix,
119+
context.getTaskInfo().getIndexOfThisSubtask(),
120+
context.getTaskInfo().getAttemptNumber(),
121+
transactionNamingStrategy == TransactionNamingStrategy.POOLING,
122+
FlinkKafkaInternalProducer::new);
123+
}
124+
return new NoopCommitter();
121125
}
122126

123127
@Internal

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
6161
private final WritableBackchannel<TransactionFinished> backchannel;
6262
@Nullable private FlinkKafkaInternalProducer<?, ?> committingProducer;
6363

64-
KafkaCommitter(
64+
public KafkaCommitter(
6565
Properties kafkaProducerConfig,
6666
String transactionalIdPrefix,
6767
int subtaskId,
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kafka.sink.internal;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.api.connector.sink2.Committer;
22+
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
23+
24+
import java.util.Collection;
25+
26+
/**
27+
* The committer to be used for non exactly-once delivery guarantees.
28+
*
29+
* <p>This committer does not commit any records. It is needed because the current {@link
30+
* org.apache.flink.api.connector.sink2.Sink} design supports only either transactional or
31+
* non-transactional operation and the {@link org.apache.flink.connector.kafka.sink.KafkaSink} is
32+
* doing both through {@link org.apache.flink.connector.base.DeliveryGuarantee}s.
33+
*/
34+
@Internal
35+
public class NoopCommitter implements Committer<KafkaCommittable> {
36+
@Override
37+
public void commit(Collection<CommitRequest<KafkaCommittable>> committables) {}
38+
39+
@Override
40+
public void close() {}
41+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kafka.sink;
19+
20+
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
21+
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
22+
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
23+
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
24+
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
27+
import java.nio.ByteBuffer;
28+
import java.util.Collections;
29+
import java.util.Optional;
30+
31+
/** mock recordSerializer for KafkaSink. */
32+
class IntegerRecordSerializer
33+
implements KafkaRecordSerializationSchema<Integer>, KafkaDatasetFacetProvider {
34+
private final String topic;
35+
36+
IntegerRecordSerializer(String topic) {
37+
this.topic = topic;
38+
}
39+
40+
@Override
41+
public ProducerRecord<byte[], byte[]> serialize(
42+
Integer element, KafkaSinkContext context, Long timestamp) {
43+
if (element == null) {
44+
// in general, serializers should be allowed to skip invalid elements
45+
return null;
46+
}
47+
byte[] bytes = ByteBuffer.allocate(4).putInt(element).array();
48+
return new ProducerRecord<>(topic, bytes, bytes);
49+
}
50+
51+
@Override
52+
public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
53+
return Optional.of(
54+
new DefaultKafkaDatasetFacet(
55+
DefaultKafkaDatasetIdentifier.ofTopics(
56+
Collections.singletonList(KafkaWriterTestBase.topic))));
57+
}
58+
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.flink.streaming.api.CheckpointingMode;
6565
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
6666
import org.apache.flink.streaming.api.datastream.DataStream;
67+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
6768
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6869
import org.apache.flink.test.junit5.InjectClusterClient;
6970
import org.apache.flink.test.junit5.InjectMiniCluster;
@@ -90,6 +91,7 @@
9091
import org.junit.jupiter.params.ParameterizedTest;
9192
import org.junit.jupiter.params.provider.Arguments;
9293
import org.junit.jupiter.params.provider.CsvSource;
94+
import org.junit.jupiter.params.provider.EnumSource;
9395
import org.junit.jupiter.params.provider.MethodSource;
9496
import org.junit.jupiter.params.provider.ValueSource;
9597
import org.slf4j.Logger;
@@ -127,6 +129,7 @@
127129
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
128130
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
129131
import static org.assertj.core.api.Assertions.assertThat;
132+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
130133

131134
/** Tests for using KafkaSink writing to a Kafka cluster. */
132135
@Testcontainers
@@ -545,6 +548,30 @@ public void checkMigration(
545548
}
546549
}
547550

551+
@ParameterizedTest
552+
@EnumSource(DeliveryGuarantee.class)
553+
void ensureUniqueTransactionalIdPrefixIfNeeded(DeliveryGuarantee guarantee) throws Exception {
554+
KafkaSinkBuilder<Integer> builder =
555+
new KafkaSinkBuilder<Integer>()
556+
.setDeliveryGuarantee(guarantee)
557+
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
558+
.setRecordSerializer(new IntegerRecordSerializer("topic"));
559+
560+
Configuration config = new Configuration();
561+
config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
562+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
563+
env.enableCheckpointing(100);
564+
DataStreamSource<Integer> source = env.fromData(1, 2);
565+
if (guarantee == DeliveryGuarantee.EXACTLY_ONCE) {
566+
assertThatThrownBy(builder::build).hasMessageContaining("unique");
567+
} else {
568+
source.sinkTo(builder.build());
569+
source.sinkTo(builder.build());
570+
571+
env.execute();
572+
}
573+
}
574+
548575
private static Configuration createConfiguration(int parallelism) {
549576
final Configuration config = new Configuration();
550577
config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@
2323
import org.apache.flink.api.java.tuple.Tuple2;
2424
import org.apache.flink.connector.base.DeliveryGuarantee;
2525
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
26-
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
27-
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
28-
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
29-
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
3026
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
3127
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
3228
import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
@@ -43,7 +39,6 @@
4339
import org.apache.kafka.clients.admin.Admin;
4440
import org.apache.kafka.clients.admin.AdminClient;
4541
import org.apache.kafka.clients.admin.NewTopic;
46-
import org.apache.kafka.clients.producer.ProducerRecord;
4742
import org.apache.kafka.clients.producer.RecordMetadata;
4843
import org.apache.kafka.common.serialization.ByteArraySerializer;
4944
import org.junit.jupiter.api.AfterEach;
@@ -59,7 +54,6 @@
5954
import javax.annotation.Nullable;
6055

6156
import java.io.IOException;
62-
import java.nio.ByteBuffer;
6357
import java.util.Collection;
6458
import java.util.Collections;
6559
import java.util.Comparator;
@@ -151,7 +145,7 @@ KafkaSink<Integer> createSink(Consumer<KafkaSinkBuilder<?>> sinkBuilderAdjuster)
151145
KafkaSink.<Integer>builder()
152146
.setKafkaProducerConfig(getKafkaClientConfiguration())
153147
.setTransactionalIdPrefix(TEST_PREFIX + writerIndex++)
154-
.setRecordSerializer(new DummyRecordSerializer());
148+
.setRecordSerializer(new IntegerRecordSerializer(topic));
155149
sinkBuilderAdjuster.accept(builder);
156150
return builder.build();
157151
}
@@ -235,29 +229,6 @@ public void setRestoredCheckpointId(long checkpointId) {
235229
}
236230
}
237231

238-
/** mock recordSerializer for KafkaSink. */
239-
protected static class DummyRecordSerializer
240-
implements KafkaRecordSerializationSchema<Integer>, KafkaDatasetFacetProvider {
241-
@Override
242-
public ProducerRecord<byte[], byte[]> serialize(
243-
Integer element, KafkaSinkContext context, Long timestamp) {
244-
if (element == null) {
245-
// in general, serializers should be allowed to skip invalid elements
246-
return null;
247-
}
248-
byte[] bytes = ByteBuffer.allocate(4).putInt(element).array();
249-
return new ProducerRecord<>(topic, bytes, bytes);
250-
}
251-
252-
@Override
253-
public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
254-
return Optional.of(
255-
new DefaultKafkaDatasetFacet(
256-
DefaultKafkaDatasetIdentifier.ofTopics(
257-
Collections.singletonList(topic))));
258-
}
259-
}
260-
261232
/**
262233
* mock context for KafkaWriter#write(java.lang.Object,
263234
* org.apache.flink.api.connector.sink2.SinkWriter.Context).

flink-connector-kafka/src/test/resources/log4j2-test.properties

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@ logger.kafka.level = OFF
4343
# If you want to investigate test failures, overwrite the level as above
4444
logger.container.name = container
4545
logger.container.level = OFF
46-
logger.container.additivity = false # This prevents messages from being logged by the root logger
46+
# This prevents messages from being logged by the root logger
47+
logger.container.additivity = false
4748
logger.container.appenderRef.containerappender.ref = ContainerLogger
4849

4950
logger.flinkenv.name = org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment
5051
logger.flinkenv.level = OFF
51-
logger.flinkenv.additivity = false # This prevents messages from being logged by the root logger
52+
# This prevents messages from being logged by the root logger
53+
logger.flinkenv.additivity = false
5254
logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger
5355

5456
appender.containerappender.name = ContainerLogger

0 commit comments

Comments
 (0)