diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index a05d32930030c..11dc99f7e6cc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; @@ -102,6 +103,19 @@ public WindowedStream trigger(Trigger trigger) { return this; } + /** + * Sets the {@code AsyncTrigger} that should be used to trigger window emission. + * + *

Will automatically enable async state for {@code WindowedStream}. + */ + @Experimental + public WindowedStream trigger(AsyncTrigger trigger) { + enableAsyncState(); + + builder.asyncTrigger(trigger); + return this; + } + /** * Sets the time by which elements are allowed to be late. Elements that arrive behind the * watermark by more than the specified time will be dropped. By default, the allowed lateness diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 28cc3b570af5b..ef0c9ef8053c1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -31,11 +31,14 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -54,9 +57,12 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; @@ -1059,9 +1065,10 @@ public void process( new Tuple2<>("hello", 1)); } - @Test + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) @SuppressWarnings("rawtypes") - void testReduceWithCustomTrigger() throws Exception { + void testReduceWithCustomTrigger(boolean enableAsyncState) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> source = @@ -1070,134 +1077,199 @@ void testReduceWithCustomTrigger() throws Exception { DummyReducer reducer = new DummyReducer(); DataStream> window1 = - source.keyBy(x -> x.f0) - .window( - SlidingEventTimeWindows.of( - Duration.ofSeconds(1), Duration.ofMillis(100))) - .trigger(CountTrigger.of(1)) - .reduce(reducer); + enableAsyncState + ? source.keyBy(x -> x.f0) + .window( + SlidingEventTimeWindows.of( + Duration.ofSeconds(1), Duration.ofMillis(100))) + .trigger(AsyncCountTrigger.of(1)) + .reduce(reducer) + : source.keyBy(x -> x.f0) + .window( + SlidingEventTimeWindows.of( + Duration.ofSeconds(1), Duration.ofMillis(100))) + .trigger(CountTrigger.of(1)) + .reduce(reducer); OneInputTransformation, Tuple2> transform = (OneInputTransformation, Tuple2>) window1.getTransformation(); OneInputStreamOperator, Tuple2> operator = transform.getOperator(); - assertThat(operator).isInstanceOf(WindowOperator.class); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; - assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class); - assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class); - assertThat(winOperator.getStateDescriptor()).isInstanceOf(ReducingStateDescriptor.class); + assertThat(((AbstractStreamOperator) operator).isAsyncKeyOrderedProcessingEnabled()) + .isEqualTo(enableAsyncState); + + KeySelector, String> keySelector; + if (enableAsyncState) { + assertThat(operator).isInstanceOf(AsyncWindowOperator.class); + AsyncWindowOperator, ?, ?, ?> winOperator = + (AsyncWindowOperator, ?, ?, ?>) operator; + assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class); + assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class); + assertThat(winOperator.getStateDescriptor()) + .isInstanceOf( + org.apache.flink.api.common.state.v2.ReducingStateDescriptor.class); + + keySelector = winOperator.getKeySelector(); + } else { + assertThat(operator).isInstanceOf(WindowOperator.class); + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; + assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class); + assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class); + assertThat(winOperator.getStateDescriptor()) + .isInstanceOf(ReducingStateDescriptor.class); + + keySelector = winOperator.getKeySelector(); + } processElementAndEnsureOutput( - winOperator, - winOperator.getKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO, - new Tuple2<>("hello", 1)); + operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } - @Test + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) @SuppressWarnings("rawtypes") - void testApplyWithCustomTrigger() throws Exception { + void testApplyWithCustomTrigger(boolean enableAsyncState) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> source = env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + WindowFunction, Tuple2, String, TimeWindow> + windowFunc = + new WindowFunction<>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply( + String key, + TimeWindow window, + Iterable> values, + Collector> out) { + for (Tuple2 in : values) { + out.collect(in); + } + } + }; DataStream> window1 = - source.keyBy(new TupleKeySelector()) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) - .trigger(CountTrigger.of(1)) - .apply( - new WindowFunction< - Tuple2, - Tuple2, - String, - TimeWindow>() { - private static final long serialVersionUID = 1L; - - @Override - public void apply( - String key, - TimeWindow window, - Iterable> values, - Collector> out) - throws Exception { - for (Tuple2 in : values) { - out.collect(in); - } - } - }); + enableAsyncState + ? source.keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) + .trigger(AsyncCountTrigger.of(1)) + .apply(windowFunc) + : source.keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) + .trigger(CountTrigger.of(1)) + .apply(windowFunc); OneInputTransformation, Tuple2> transform = (OneInputTransformation, Tuple2>) window1.getTransformation(); OneInputStreamOperator, Tuple2> operator = transform.getOperator(); - assertThat(operator).isInstanceOf(WindowOperator.class); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; - assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class); - assertThat(winOperator.getWindowAssigner()).isInstanceOf(TumblingEventTimeWindows.class); - assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class); + assertThat(((AbstractStreamOperator) operator).isAsyncKeyOrderedProcessingEnabled()) + .isEqualTo(enableAsyncState); + + KeySelector, String> keySelector; + if (enableAsyncState) { + assertThat(operator).isInstanceOf(AsyncWindowOperator.class); + AsyncWindowOperator, ?, ?, ?> winOperator = + (AsyncWindowOperator, ?, ?, ?>) operator; + assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class); + assertThat(winOperator.getWindowAssigner()) + .isInstanceOf(TumblingEventTimeWindows.class); + assertThat(winOperator.getStateDescriptor()) + .isInstanceOf(org.apache.flink.api.common.state.v2.ListStateDescriptor.class); + + keySelector = winOperator.getKeySelector(); + } else { + assertThat(operator).isInstanceOf(WindowOperator.class); + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; + assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class); + assertThat(winOperator.getWindowAssigner()) + .isInstanceOf(TumblingEventTimeWindows.class); + assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class); + + keySelector = winOperator.getKeySelector(); + } processElementAndEnsureOutput( - winOperator, - winOperator.getKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO, - new Tuple2<>("hello", 1)); + operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } - @Test + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) @SuppressWarnings("rawtypes") - void testProcessWithCustomTrigger() throws Exception { + void testProcessWithCustomTrigger(boolean enableAsyncState) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> source = env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + ProcessWindowFunction, Tuple2, String, TimeWindow> + windowFunc = + new ProcessWindowFunction<>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + String key, + Context ctx, + Iterable> values, + Collector> out) + throws Exception { + for (Tuple2 in : values) { + out.collect(in); + } + } + }; DataStream> window1 = - source.keyBy(new TupleKeySelector()) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) - .trigger(CountTrigger.of(1)) - .process( - new ProcessWindowFunction< - Tuple2, - Tuple2, - String, - TimeWindow>() { - private static final long serialVersionUID = 1L; - - @Override - public void process( - String key, - Context ctx, - Iterable> values, - Collector> out) - throws Exception { - for (Tuple2 in : values) { - out.collect(in); - } - } - }); + enableAsyncState + ? source.keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) + .trigger(AsyncCountTrigger.of(1)) + .process(windowFunc) + : source.keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) + .trigger(CountTrigger.of(1)) + .process(windowFunc); OneInputTransformation, Tuple2> transform = (OneInputTransformation, Tuple2>) window1.getTransformation(); OneInputStreamOperator, Tuple2> operator = transform.getOperator(); - assertThat(operator).isInstanceOf(WindowOperator.class); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; - assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class); - assertThat(winOperator.getWindowAssigner()).isInstanceOf(TumblingEventTimeWindows.class); - assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class); + assertThat(((AbstractStreamOperator) operator).isAsyncKeyOrderedProcessingEnabled()) + .isEqualTo(enableAsyncState); + + KeySelector, String> keySelector; + if (enableAsyncState) { + assertThat(operator).isInstanceOf(AsyncWindowOperator.class); + AsyncWindowOperator, ?, ?, ?> winOperator = + (AsyncWindowOperator, ?, ?, ?>) operator; + assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class); + assertThat(winOperator.getWindowAssigner()) + .isInstanceOf(TumblingEventTimeWindows.class); + assertThat(winOperator.getStateDescriptor()) + .isInstanceOf(org.apache.flink.api.common.state.v2.ListStateDescriptor.class); + + keySelector = winOperator.getKeySelector(); + } else { + assertThat(operator).isInstanceOf(WindowOperator.class); + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; + assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class); + assertThat(winOperator.getWindowAssigner()) + .isInstanceOf(TumblingEventTimeWindows.class); + assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class); + + keySelector = winOperator.getKeySelector(); + } processElementAndEnsureOutput( - winOperator, - winOperator.getKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO, - new Tuple2<>("hello", 1)); + operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } @Test @@ -1486,9 +1558,14 @@ private static void processElementAndEnsureOutput( TypeInformation keyType, IN element) throws Exception { - + boolean enableAsyncState = + ((AbstractStreamOperator) operator).isAsyncKeyOrderedProcessingEnabled(); KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType); + enableAsyncState + ? AsyncKeyedOneInputStreamOperatorTestHarness.create( + operator, keySelector, keyType) + : new KeyedOneInputStreamOperatorTestHarness<>( + operator, keySelector, keyType); if (operator instanceof OutputTypeConfigurable) { // use a dummy type since window functions just need the ExecutionConfig