diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
index 20105062075d7..0139af98344a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
@@ -97,7 +97,7 @@ public void onMerge(W window, OnMergeContext ctx) throws Exception {
@Override
public String toString() {
- return "CountTrigger(" + maxCount + ")";
+ return "AsyncCountTrigger(" + maxCount + ")";
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
new file mode 100644
index 0000000000000..7e0e8955405c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A converter from {@code Trigger} to {@code AsyncTrigger}.
+ *
+ *
Basic triggers (e.g., {@code CountTrigger}) are directly converted to their async version.
+ *
+ *
Async-support triggers which implement {@code AsyncTriggerConvertable} (e.g., {@code
+ * ProcessingTimeoutTrigger}) will use self-defined async version.
+ *
+ *
Other triggers are wrapped as an {@code AsyncTrigger}, whose internal functions are executed
+ * in sync mode.
+ */
+@Internal
+public interface AsyncTriggerConverter {
+
+ /**
+ * Convert to an {@code AsyncTrigger}. The default implementation is only a wrapper of the
+ * trigger, whose behaviours are all sync.
+ *
+ *
TODO: Return {@code AsyncTrigger} if {@code AsyncTrigger} becomes @PublicEvolving.
+ *
+ * @return The {@code AsyncTrigger} for async state processing.
+ */
+ @Nonnull
+ default Object convertToAsync() {
+ return UserDefinedAsyncTrigger.of((Trigger, ?>) AsyncTriggerConverter.this);
+ }
+
+ @SuppressWarnings("unchecked")
+ static AsyncTrigger convertToAsync(Trigger trigger) {
+ if (trigger instanceof CountTrigger) {
+ return (AsyncTrigger)
+ AsyncCountTrigger.of(((CountTrigger>) trigger).getMaxCount());
+ } else if (trigger instanceof EventTimeTrigger) {
+ return (AsyncTrigger) AsyncEventTimeTrigger.create();
+ } else if (trigger instanceof ProcessingTimeTrigger) {
+ return (AsyncTrigger) AsyncProcessingTimeTrigger.create();
+ } else if (trigger instanceof PurgingTrigger) {
+ return (AsyncTrigger)
+ AsyncPurgingTrigger.of(
+ convertToAsync(((PurgingTrigger, ?>) trigger).getNestedTrigger()));
+ } else if (trigger instanceof AsyncTriggerConverter) {
+ return (AsyncTrigger) ((AsyncTriggerConverter) trigger).convertToAsync();
+ } else {
+ return UserDefinedAsyncTrigger.of(trigger);
+ }
+ }
+
+ /** Convert non-support user-defined trigger to {@code AsyncTrigger}. */
+ class UserDefinedAsyncTrigger extends AsyncTrigger {
+ private final Trigger userDefinedTrigger;
+
+ private UserDefinedAsyncTrigger(Trigger userDefinedTrigger) {
+ this.userDefinedTrigger = userDefinedTrigger;
+ }
+
+ @Override
+ public StateFuture onElement(
+ T element, long timestamp, W window, TriggerContext ctx) throws Exception {
+ return StateFutureUtils.completedFuture(
+ userDefinedTrigger.onElement(
+ element, timestamp, window, AsyncTriggerContextConvertor.of(ctx)));
+ }
+
+ @Override
+ public StateFuture onProcessingTime(long time, W window, TriggerContext ctx)
+ throws Exception {
+ return StateFutureUtils.completedFuture(
+ userDefinedTrigger.onProcessingTime(
+ time, window, AsyncTriggerContextConvertor.of(ctx)));
+ }
+
+ @Override
+ public StateFuture onEventTime(long time, W window, TriggerContext ctx)
+ throws Exception {
+ return StateFutureUtils.completedFuture(
+ userDefinedTrigger.onEventTime(
+ time, window, AsyncTriggerContextConvertor.of(ctx)));
+ }
+
+ @Override
+ public StateFuture clear(W window, TriggerContext ctx) throws Exception {
+ userDefinedTrigger.clear(window, AsyncTriggerContextConvertor.of(ctx));
+ return StateFutureUtils.completedVoidFuture();
+ }
+
+ @Override
+ public boolean isEndOfStreamTrigger() {
+ return userDefinedTrigger instanceof GlobalWindows.EndOfStreamTrigger;
+ }
+
+ public static AsyncTrigger of(
+ Trigger userDefinedTrigger) {
+ return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
+ }
+
+ /**
+ * A converter from {@link AsyncTrigger.TriggerContext} to {@link Trigger.TriggerContext}.
+ */
+ private static class AsyncTriggerContextConvertor implements Trigger.TriggerContext {
+
+ private final AsyncTrigger.TriggerContext asyncTriggerContext;
+
+ private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext asyncTriggerContext) {
+ this.asyncTriggerContext = asyncTriggerContext;
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return asyncTriggerContext.getCurrentProcessingTime();
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return asyncTriggerContext.getMetricGroup();
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return asyncTriggerContext.getCurrentWatermark();
+ }
+
+ @Override
+ public void registerProcessingTimeTimer(long time) {
+ asyncTriggerContext.registerProcessingTimeTimer(time);
+ }
+
+ @Override
+ public void registerEventTimeTimer(long time) {
+ asyncTriggerContext.registerEventTimeTimer(time);
+ }
+
+ @Override
+ public void deleteProcessingTimeTimer(long time) {
+ asyncTriggerContext.deleteProcessingTimeTimer(time);
+ }
+
+ @Override
+ public void deleteEventTimeTimer(long time) {
+ asyncTriggerContext.deleteEventTimeTimer(time);
+ }
+
+ @Override
+ public S getPartitionedState(StateDescriptor stateDescriptor) {
+ throw new UnsupportedOperationException(
+ "Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs.");
+ }
+
+ public static Trigger.TriggerContext of(
+ AsyncTrigger.TriggerContext asyncTriggerContext) {
+ return new AsyncTriggerContextConvertor(asyncTriggerContext);
+ }
+ }
+
+ @VisibleForTesting
+ public Trigger getUserDefinedTrigger() {
+ return userDefinedTrigger;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
index ee1b64d07bd2b..cdfa7ad21725a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
@@ -28,15 +28,11 @@
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.state.StateFutureUtils;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncEvictingWindowOperator;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAggregateProcessAsyncWindowFunction;
@@ -45,10 +41,6 @@
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableProcessAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueProcessAsyncWindowFunction;
-import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
-import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
-import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
-import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -56,17 +48,10 @@
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.EndOfStreamTrigger;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -570,130 +555,4 @@ public String generateOperatorDescription(Function function1, @Nullable Function
public long getAllowedLateness() {
return allowedLateness;
}
-
- private static class UserDefinedAsyncTrigger extends AsyncTrigger {
- private final Trigger userDefinedTrigger;
-
- private UserDefinedAsyncTrigger(Trigger userDefinedTrigger) {
- this.userDefinedTrigger = userDefinedTrigger;
- }
-
- @Override
- public StateFuture onElement(
- T element, long timestamp, W window, TriggerContext ctx) throws Exception {
- return StateFutureUtils.completedFuture(
- userDefinedTrigger.onElement(
- element, timestamp, window, AsyncTriggerContextConvertor.of(ctx)));
- }
-
- @Override
- public StateFuture onProcessingTime(long time, W window, TriggerContext ctx)
- throws Exception {
- return StateFutureUtils.completedFuture(
- userDefinedTrigger.onProcessingTime(
- time, window, AsyncTriggerContextConvertor.of(ctx)));
- }
-
- @Override
- public StateFuture onEventTime(long time, W window, TriggerContext ctx)
- throws Exception {
- return StateFutureUtils.completedFuture(
- userDefinedTrigger.onEventTime(
- time, window, AsyncTriggerContextConvertor.of(ctx)));
- }
-
- @Override
- public StateFuture clear(W window, TriggerContext ctx) throws Exception {
- userDefinedTrigger.clear(window, AsyncTriggerContextConvertor.of(ctx));
- return StateFutureUtils.completedVoidFuture();
- }
-
- @Override
- public boolean isEndOfStreamTrigger() {
- return userDefinedTrigger instanceof EndOfStreamTrigger;
- }
-
- public static AsyncTrigger of(
- Trigger userDefinedTrigger) {
- return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
- }
- }
-
- private static class AsyncTriggerConverter {
-
- @SuppressWarnings("unchecked")
- public static AsyncTrigger convertToAsync(
- Trigger trigger) {
- if (trigger instanceof CountTrigger) {
- return (AsyncTrigger)
- AsyncCountTrigger.of(((CountTrigger>) trigger).getMaxCount());
- } else if (trigger instanceof EventTimeTrigger) {
- return (AsyncTrigger) AsyncEventTimeTrigger.create();
- } else if (trigger instanceof ProcessingTimeTrigger) {
- return (AsyncTrigger) AsyncProcessingTimeTrigger.create();
- } else if (trigger instanceof PurgingTrigger) {
- return (AsyncTrigger)
- AsyncPurgingTrigger.of(
- convertToAsync(
- ((PurgingTrigger, ?>) trigger).getNestedTrigger()));
- } else {
- return UserDefinedAsyncTrigger.of(trigger);
- }
- }
- }
-
- /** A converter from {@link AsyncTrigger.TriggerContext} to {@link Trigger.TriggerContext}. */
- private static class AsyncTriggerContextConvertor implements TriggerContext {
-
- private final AsyncTrigger.TriggerContext asyncTriggerContext;
-
- private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext asyncTriggerContext) {
- this.asyncTriggerContext = asyncTriggerContext;
- }
-
- @Override
- public long getCurrentProcessingTime() {
- return asyncTriggerContext.getCurrentProcessingTime();
- }
-
- @Override
- public MetricGroup getMetricGroup() {
- return asyncTriggerContext.getMetricGroup();
- }
-
- @Override
- public long getCurrentWatermark() {
- return asyncTriggerContext.getCurrentWatermark();
- }
-
- @Override
- public void registerProcessingTimeTimer(long time) {
- asyncTriggerContext.registerProcessingTimeTimer(time);
- }
-
- @Override
- public void registerEventTimeTimer(long time) {
- asyncTriggerContext.registerEventTimeTimer(time);
- }
-
- @Override
- public void deleteProcessingTimeTimer(long time) {
- asyncTriggerContext.deleteProcessingTimeTimer(time);
- }
-
- @Override
- public void deleteEventTimeTimer(long time) {
- asyncTriggerContext.deleteEventTimeTimer(time);
- }
-
- @Override
- public S getPartitionedState(StateDescriptor stateDescriptor) {
- throw new UnsupportedOperationException(
- "Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs.");
- }
-
- public static TriggerContext of(AsyncTrigger.TriggerContext asyncTriggerContext) {
- return new AsyncTriggerContextConvertor(asyncTriggerContext);
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
new file mode 100644
index 0000000000000..dbeada9aa04ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/** Utility for testing {@link AsyncTrigger} behaviour. */
+public class AsyncTriggerTestHarness extends TriggerTestHarness {
+
+ private final AsyncTrigger trigger;
+
+ // Async adaptor to realStateBackend.
+ private final AsyncKeyedStateBackend asyncStateBackend;
+
+ /**
+ * Initialize test harness for async trigger.
+ *
+ *
The state backend is heap, which does not support async state operation. The tests use
+ * async state API, but all state operations execute in sync mode.
+ */
+ public AsyncTriggerTestHarness(AsyncTrigger trigger, TypeSerializer windowSerializer)
+ throws Exception {
+ super(null, windowSerializer);
+ this.trigger = trigger;
+
+ this.asyncStateBackend = new AsyncKeyedStateBackendAdaptor<>(stateBackend);
+ }
+
+ // ------------------------------------------------------------------------------
+ // Override TriggerTestHarness API
+ // ------------------------------------------------------------------------------
+
+ @Override
+ public TriggerResult processElement(StreamRecord element, W window) throws Exception {
+ return completeStateFuture(asyncProcessElement(element, window));
+ }
+
+ @Override
+ public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
+ return completeStateFuture(asyncAdvanceProcessingTime(time, window));
+ }
+
+ @Override
+ public TriggerResult advanceWatermark(long time, W window) throws Exception {
+ return completeStateFuture(asyncAdvanceWatermark(time, window));
+ }
+
+ @Override
+ public Collection> advanceProcessingTime(long time) throws Exception {
+ return asyncAdvanceProcessingTime(time).stream()
+ .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1)))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Collection> advanceWatermark(long time) throws Exception {
+ return asyncAdvanceWatermark(time).stream()
+ .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1)))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public TriggerResult invokeOnEventTime(long timestamp, W window) throws Exception {
+ return completeStateFuture(asyncInvokeOnEventTime(timestamp, window));
+ }
+
+ @Override
+ public void mergeWindows(W targetWindow, Collection mergedWindows) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearTriggerState(W window) throws Exception {
+ completeStateFuture(asyncClearTriggerState(window));
+ }
+
+ // ------------------------------------------------------------------------------
+ // Using Async State API
+ // ------------------------------------------------------------------------------
+
+ StateFuture asyncProcessElement(StreamRecord element, W window)
+ throws Exception {
+ TestTriggerContext triggerContext =
+ new TestTriggerContext<>(
+ KEY, window, internalTimerService, asyncStateBackend, windowSerializer);
+ return trigger.onElement(
+ element.getValue(), element.getTimestamp(), window, triggerContext);
+ }
+
+ StateFuture asyncAdvanceProcessingTime(long time, W window) throws Exception {
+ Collection>> firings =
+ asyncAdvanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException(
+ "Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2> firing = firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for another window.");
+ }
+
+ return firing.f1;
+ }
+
+ StateFuture asyncAdvanceWatermark(long time, W window) throws Exception {
+ Collection>> firings = asyncAdvanceWatermark(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException(
+ "Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2> firing = firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for another window.");
+ }
+
+ return firing.f1;
+ }
+
+ Collection>> asyncAdvanceProcessingTime(long time)
+ throws Exception {
+ Collection> firedTimers =
+ internalTimerService.advanceProcessingTime(time);
+
+ Collection>> result = new ArrayList<>();
+
+ for (TestInternalTimerService.Timer timer : firedTimers) {
+ TestTriggerContext triggerContext =
+ new TestTriggerContext<>(
+ KEY,
+ timer.getNamespace(),
+ internalTimerService,
+ asyncStateBackend,
+ windowSerializer);
+
+ StateFuture triggerResult =
+ trigger.onProcessingTime(
+ timer.getTimestamp(), timer.getNamespace(), triggerContext);
+
+ result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+ }
+
+ return result;
+ }
+
+ Collection>> asyncAdvanceWatermark(long time)
+ throws Exception {
+ Collection> firedTimers =
+ internalTimerService.advanceWatermark(time);
+
+ Collection>> result = new ArrayList<>();
+
+ for (TestInternalTimerService.Timer timer : firedTimers) {
+ StateFuture triggerResult = asyncInvokeOnEventTime(timer);
+ result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+ }
+
+ return result;
+ }
+
+ private StateFuture asyncInvokeOnEventTime(
+ TestInternalTimerService.Timer timer) throws Exception {
+ TestTriggerContext triggerContext =
+ new TestTriggerContext<>(
+ KEY,
+ timer.getNamespace(),
+ internalTimerService,
+ asyncStateBackend,
+ windowSerializer);
+
+ return trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
+ }
+
+ StateFuture asyncInvokeOnEventTime(long timestamp, W window) throws Exception {
+ TestInternalTimerService.Timer timer =
+ new TestInternalTimerService.Timer<>(timestamp, KEY, window);
+
+ return asyncInvokeOnEventTime(timer);
+ }
+
+ StateFuture asyncClearTriggerState(W window) throws Exception {
+ TestTriggerContext triggerContext =
+ new TestTriggerContext<>(
+ KEY, window, internalTimerService, asyncStateBackend, windowSerializer);
+ return trigger.clear(window, triggerContext);
+ }
+
+ // ------------------------------------------------------------------------------
+ // Context
+ // ------------------------------------------------------------------------------
+
+ private static class TestTriggerContext
+ implements AsyncTrigger.TriggerContext {
+
+ protected final InternalTimerService timerService;
+ protected final AsyncKeyedStateBackend stateBackend;
+ protected final K key;
+ protected final W window;
+ protected final TypeSerializer windowSerializer;
+
+ TestTriggerContext(
+ K key,
+ W window,
+ InternalTimerService timerService,
+ AsyncKeyedStateBackend stateBackend,
+ TypeSerializer windowSerializer) {
+ this.key = key;
+ this.window = window;
+ this.timerService = timerService;
+ this.stateBackend = stateBackend;
+ this.windowSerializer = windowSerializer;
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return timerService.currentProcessingTime();
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return null;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return timerService.currentWatermark();
+ }
+
+ @Override
+ public void registerProcessingTimeTimer(long time) {
+ timerService.registerProcessingTimeTimer(window, time);
+ }
+
+ @Override
+ public void registerEventTimeTimer(long time) {
+ timerService.registerEventTimeTimer(window, time);
+ }
+
+ @Override
+ public void deleteProcessingTimeTimer(long time) {
+ timerService.deleteProcessingTimeTimer(window, time);
+ }
+
+ @Override
+ public void deleteEventTimeTimer(long time) {
+ timerService.deleteEventTimeTimer(window, time);
+ }
+
+ @Override
+ public S getPartitionedState(StateDescriptor stateDescriptor) {
+ try {
+ // single key (KEY), no need declaration.
+ return stateBackend.getOrCreateKeyedState(
+ window, windowSerializer, stateDescriptor);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+ }
+
+ static T completeStateFuture(StateFuture future) {
+ InternalAsyncFuture internalAsyncFuture = (InternalAsyncFuture) future;
+
+ // The harness executes state operations in sync mode, any StateFuture should be completed.
+ Preconditions.checkArgument(internalAsyncFuture.isDone());
+ return internalAsyncFuture.get();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 02454dd4d6822..9642500bc4806 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -54,13 +54,13 @@
/** Utility for testing {@link Trigger} behaviour. */
public class TriggerTestHarness {
- private static final Integer KEY = 1;
+ protected static final Integer KEY = 1;
private final Trigger trigger;
- private final TypeSerializer windowSerializer;
+ protected final TypeSerializer windowSerializer;
- private final HeapKeyedStateBackend stateBackend;
- private final TestInternalTimerService internalTimerService;
+ protected final HeapKeyedStateBackend stateBackend;
+ protected final TestInternalTimerService internalTimerService;
public TriggerTestHarness(Trigger trigger, TypeSerializer windowSerializer)
throws Exception {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
new file mode 100644
index 0000000000000..ea602cb172fc3
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * A {@link AsyncTrigger} that can turn any {@link AsyncTrigger} into a timeout {@code
+ * AsyncTrigger}.
+ *
+ *
On the first arriving element a configurable processing-time timeout will be set. Using {@link
+ * #of(AsyncTrigger, Duration, boolean, boolean)}, you can also re-new the timer for each arriving
+ * element by specifying {@code resetTimerOnNewRecord} and you can specify whether {@link
+ * AsyncTrigger#clear(Window, AsyncTrigger.TriggerContext)} should be called on timout via {@code
+ * shouldClearOnTimeout}.
+ *
+ * @param The type of elements on which this trigger can operate.
+ * @param The type of {@link Window} on which this trigger can operate.
+ */
+@Experimental
+public class AsyncProcessingTimeoutTrigger extends AsyncTrigger {
+ private static final long serialVersionUID = 1L;
+
+ private final AsyncTrigger nestedTrigger;
+ private final long interval;
+ private final boolean resetTimerOnNewRecord;
+ private final boolean shouldClearOnTimeout;
+
+ private final ValueStateDescriptor timeoutStateDesc;
+
+ public AsyncProcessingTimeoutTrigger(
+ AsyncTrigger nestedTrigger,
+ long interval,
+ boolean resetTimerOnNewRecord,
+ boolean shouldClearOnTimeout) {
+ this.nestedTrigger = nestedTrigger;
+ this.interval = interval;
+ this.resetTimerOnNewRecord = resetTimerOnNewRecord;
+ this.shouldClearOnTimeout = shouldClearOnTimeout;
+
+ this.timeoutStateDesc = new ValueStateDescriptor<>("timeout", LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public StateFuture onElement(
+ T element, long timestamp, W window, TriggerContext ctx) throws Exception {
+ return this.nestedTrigger
+ .onElement(element, timestamp, window, ctx)
+ .thenConditionallyCompose(
+ TriggerResult::isFire,
+ triggerResult -> this.clear(window, ctx).thenApply(ignore -> triggerResult),
+ triggerResult -> {
+ ValueState timeoutState =
+ ctx.getPartitionedState(this.timeoutStateDesc);
+ long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
+
+ return timeoutState
+ .asyncValue()
+ .thenConditionallyCompose(
+ Objects::nonNull,
+ timeoutTimestamp -> {
+ if (resetTimerOnNewRecord) {
+ ctx.deleteProcessingTimeTimer(timeoutTimestamp);
+ return timeoutState
+ .asyncClear()
+ .thenApply(ignore -> null);
+ } else {
+ return StateFutureUtils.completedFuture(
+ timeoutTimestamp);
+ }
+ })
+ .thenConditionallyCompose(
+ tuple -> tuple.f1 /*timeoutTimestamp*/ == null,
+ ignore ->
+ timeoutState
+ .asyncUpdate(nextFireTimestamp)
+ .thenAccept(
+ ignore2 ->
+ ctx
+ .registerProcessingTimeTimer(
+ nextFireTimestamp)))
+ .thenApply(ignore -> triggerResult);
+ })
+ .thenApply(tuple -> (TriggerResult) tuple.f1);
+ }
+
+ @Override
+ public StateFuture onProcessingTime(long time, W window, TriggerContext ctx)
+ throws Exception {
+ return this.nestedTrigger
+ .onProcessingTime(time, window, ctx)
+ .thenCompose(
+ triggerResult -> {
+ TriggerResult finalResult =
+ triggerResult.isPurge()
+ ? TriggerResult.FIRE_AND_PURGE
+ : TriggerResult.FIRE;
+ return shouldClearOnTimeout
+ ? this.clear(window, ctx).thenApply(ignore -> finalResult)
+ : StateFutureUtils.completedFuture(finalResult);
+ });
+ }
+
+ @Override
+ public StateFuture onEventTime(long time, W window, TriggerContext ctx)
+ throws Exception {
+ return this.nestedTrigger
+ .onEventTime(time, window, ctx)
+ .thenCompose(
+ triggerResult -> {
+ TriggerResult finalResult =
+ triggerResult.isPurge()
+ ? TriggerResult.FIRE_AND_PURGE
+ : TriggerResult.FIRE;
+ return shouldClearOnTimeout
+ ? this.clear(window, ctx).thenApply(ignore -> finalResult)
+ : StateFutureUtils.completedFuture(finalResult);
+ });
+ }
+
+ @Override
+ public StateFuture clear(W window, TriggerContext ctx) throws Exception {
+ ValueState timeoutTimestampState = ctx.getPartitionedState(this.timeoutStateDesc);
+ return timeoutTimestampState
+ .asyncValue()
+ .thenConditionallyCompose(
+ Objects::nonNull,
+ timeoutTimestamp -> {
+ ctx.deleteProcessingTimeTimer(timeoutTimestamp);
+ return timeoutTimestampState.asyncClear();
+ })
+ .thenCompose(ignore -> this.nestedTrigger.clear(window, ctx));
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncTimeoutTrigger(" + this.nestedTrigger.toString() + ")";
+ }
+
+ /**
+ * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the inner trigger is
+ * fired or when the timeout timer fires.
+ *
+ *
For example: {@code AsyncProcessingTimeoutTrigger.of(AsyncCountTrigger.of(3), 100)}, will
+ * create a AsyncCountTrigger with timeout of 100 millis. So, if the first record arrives at
+ * time {@code t}, and the second record arrives at time {@code t+50 }, the trigger will fire
+ * when the third record arrives or when the time is {code t+100} (timeout).
+ *
+ * @param nestedTrigger the nested {@link AsyncTrigger}
+ * @param timeout the timeout interval
+ * @return {@link AsyncProcessingTimeoutTrigger} with the above configuration.
+ */
+ public static AsyncProcessingTimeoutTrigger of(
+ AsyncTrigger nestedTrigger, Duration timeout) {
+ return new AsyncProcessingTimeoutTrigger<>(nestedTrigger, timeout.toMillis(), false, true);
+ }
+
+ /**
+ * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the inner trigger is
+ * fired or when the timeout timer fires.
+ *
+ *
For example: {@code AsyncProcessingTimeoutTrigger.of(CountTrigger.of(3), 100, false,
+ * true)}, will create a AsyncCountTrigger with timeout of 100 millis. So, if the first record
+ * arrives at time {@code t}, and the second record arrives at time {@code t+50 }, the trigger
+ * will fire when the third record arrives or when the time is {code t+100} (timeout).
+ *
+ * @param nestedTrigger the nested {@link AsyncTrigger}
+ * @param timeout the timeout interval
+ * @param resetTimerOnNewRecord each time a new element arrives, reset the timer and start a new
+ * one
+ * @param shouldClearOnTimeout whether to call {@link AsyncTrigger#clear(Window,
+ * AsyncTrigger.TriggerContext)} when the processing-time timer fires
+ * @param The type of the element.
+ * @param The type of {@link Window Windows} on which this trigger can operate.
+ * @return {@link AsyncProcessingTimeoutTrigger} with the above configuration.
+ */
+ public static AsyncProcessingTimeoutTrigger of(
+ AsyncTrigger nestedTrigger,
+ Duration timeout,
+ boolean resetTimerOnNewRecord,
+ boolean shouldClearOnTimeout) {
+ return new AsyncProcessingTimeoutTrigger<>(
+ nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
+ }
+
+ @VisibleForTesting
+ public AsyncTrigger getNestedTrigger() {
+ return nestedTrigger;
+ }
+
+ @VisibleForTesting
+ public long getInterval() {
+ return interval;
+ }
+
+ @VisibleForTesting
+ public boolean isResetTimerOnNewRecord() {
+ return resetTimerOnNewRecord;
+ }
+
+ @VisibleForTesting
+ public boolean isShouldClearOnTimeout() {
+ return shouldClearOnTimeout;
+ }
+
+ @VisibleForTesting
+ public ValueStateDescriptor getTimeoutStateDesc() {
+ return timeoutStateDesc;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
index 268edfbf14b14..7d972e8864374 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
@@ -23,6 +23,9 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConverter;
+
+import javax.annotation.Nonnull;
import java.time.Duration;
@@ -39,7 +42,8 @@
* @param The type of {@link Window} on which this trigger can operate.
*/
@PublicEvolving
-public class ProcessingTimeoutTrigger extends Trigger {
+public class ProcessingTimeoutTrigger extends Trigger
+ implements AsyncTriggerConverter {
private static final long serialVersionUID = 1L;
@@ -169,4 +173,14 @@ public static ProcessingTimeoutTrigger of(
return new ProcessingTimeoutTrigger<>(
nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
}
+
+ @Override
+ @Nonnull
+ public Object convertToAsync() {
+ return AsyncProcessingTimeoutTrigger.of(
+ AsyncTriggerConverter.convertToAsync(this.nestedTrigger),
+ Duration.ofMillis(interval),
+ resetTimerOnNewRecord,
+ shouldClearOnTimeout);
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
new file mode 100644
index 0000000000000..cf1fe75400b37
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@code AsyncTriggerConverter}. */
+public class AsyncTriggerConverterTest {
+ private static class DummyTriggerWithoutAsyncConverter extends Trigger