From 5d6ddc9b1be89f89bc770e42bc2ae45eb31f44f3 Mon Sep 17 00:00:00 2001 From: xiarui Date: Thu, 25 Sep 2025 11:17:47 +0800 Subject: [PATCH 1/4] [FLINK-38364][streaming-java] Implement async state version of ProcessingTimeoutTrigger --- .../windowing/triggers/AsyncCountTrigger.java | 2 +- .../windowing/AsyncTriggerConvertable.java | 40 +++ .../windowing/AsyncTriggerConverter.java | 179 ++++++++++ .../windowing/WindowOperatorBuilder.java | 141 -------- .../windowing/AsyncTriggerTestHarness.java | 308 ++++++++++++++++++ .../windowing/TriggerTestHarness.java | 8 +- .../AsyncProcessingTimeoutTrigger.java | 237 ++++++++++++++ .../triggers/ProcessingTimeoutTrigger.java | 18 +- .../ProcessingTimeoutTriggerTest.java | 100 ++++-- 9 files changed, 858 insertions(+), 175 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java index 20105062075d7..0139af98344a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java @@ -97,7 +97,7 @@ public void onMerge(W window, OnMergeContext ctx) throws Exception { @Override public String toString() { - return "CountTrigger(" + maxCount + ")"; + return "AsyncCountTrigger(" + maxCount + ")"; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java new file mode 100644 index 0000000000000..ec7474ae42b53 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import javax.annotation.Nonnull; + +/** + * Interface for declaring the ability to convert a sync {@code Trigger} to {@code AsyncTrigger}. + * The trigger conversion happens in {@code AsyncTriggerConverter}. + */ +@Experimental +public interface AsyncTriggerConvertable { + /** + * Convert to an {@code AsyncTrigger}. + * + * @return The {@code AsyncTrigger} for async state processing. + */ + @Nonnull + AsyncTrigger convertToAsync(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java new file mode 100644 index 0000000000000..dc0d2c8f07f35 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * A converter from {@code Trigger} to {@code AsyncTrigger}. + * + *

Basic triggers (e.g., {@code CountTrigger}) are directly converted to their async version. + * + *

Async-support triggers which implement {@code AsyncTriggerConvertable} (e.g., {@code + * ProcessingTimeoutTrigger}) will use self-defined async version. + * + *

Other triggers are wrapped as an {@code AsyncTrigger}, whose internal functions are executed + * in sync mode. + */ +public class AsyncTriggerConverter { + + @SuppressWarnings("unchecked") + public static AsyncTrigger convertToAsync(Trigger trigger) { + if (trigger instanceof CountTrigger) { + return (AsyncTrigger) + AsyncCountTrigger.of(((CountTrigger) trigger).getMaxCount()); + } else if (trigger instanceof EventTimeTrigger) { + return (AsyncTrigger) AsyncEventTimeTrigger.create(); + } else if (trigger instanceof ProcessingTimeTrigger) { + return (AsyncTrigger) AsyncProcessingTimeTrigger.create(); + } else if (trigger instanceof PurgingTrigger) { + return (AsyncTrigger) + AsyncPurgingTrigger.of( + convertToAsync(((PurgingTrigger) trigger).getNestedTrigger())); + } else if (trigger instanceof AsyncTriggerConvertable) { + return ((AsyncTriggerConvertable) trigger).convertToAsync(); + } else { + return UserDefinedAsyncTrigger.of(trigger); + } + } + + /** Convert non-support user-defined trigger to {@code AsyncTrigger}. */ + private static class UserDefinedAsyncTrigger extends AsyncTrigger { + private final Trigger userDefinedTrigger; + + private UserDefinedAsyncTrigger(Trigger userDefinedTrigger) { + this.userDefinedTrigger = userDefinedTrigger; + } + + @Override + public StateFuture onElement( + T element, long timestamp, W window, TriggerContext ctx) throws Exception { + return StateFutureUtils.completedFuture( + userDefinedTrigger.onElement( + element, timestamp, window, AsyncTriggerContextConvertor.of(ctx))); + } + + @Override + public StateFuture onProcessingTime(long time, W window, TriggerContext ctx) + throws Exception { + return StateFutureUtils.completedFuture( + userDefinedTrigger.onProcessingTime( + time, window, AsyncTriggerContextConvertor.of(ctx))); + } + + @Override + public StateFuture onEventTime(long time, W window, TriggerContext ctx) + throws Exception { + return StateFutureUtils.completedFuture( + userDefinedTrigger.onEventTime( + time, window, AsyncTriggerContextConvertor.of(ctx))); + } + + @Override + public StateFuture clear(W window, TriggerContext ctx) throws Exception { + userDefinedTrigger.clear(window, AsyncTriggerContextConvertor.of(ctx)); + return StateFutureUtils.completedVoidFuture(); + } + + @Override + public boolean isEndOfStreamTrigger() { + return userDefinedTrigger instanceof GlobalWindows.EndOfStreamTrigger; + } + + public static AsyncTrigger of( + Trigger userDefinedTrigger) { + return new UserDefinedAsyncTrigger<>(userDefinedTrigger); + } + + /** + * A converter from {@link AsyncTrigger.TriggerContext} to {@link Trigger.TriggerContext}. + */ + private static class AsyncTriggerContextConvertor implements Trigger.TriggerContext { + + private final AsyncTrigger.TriggerContext asyncTriggerContext; + + private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext asyncTriggerContext) { + this.asyncTriggerContext = asyncTriggerContext; + } + + @Override + public long getCurrentProcessingTime() { + return asyncTriggerContext.getCurrentProcessingTime(); + } + + @Override + public MetricGroup getMetricGroup() { + return asyncTriggerContext.getMetricGroup(); + } + + @Override + public long getCurrentWatermark() { + return asyncTriggerContext.getCurrentWatermark(); + } + + @Override + public void registerProcessingTimeTimer(long time) { + asyncTriggerContext.registerProcessingTimeTimer(time); + } + + @Override + public void registerEventTimeTimer(long time) { + asyncTriggerContext.registerEventTimeTimer(time); + } + + @Override + public void deleteProcessingTimeTimer(long time) { + asyncTriggerContext.deleteProcessingTimeTimer(time); + } + + @Override + public void deleteEventTimeTimer(long time) { + asyncTriggerContext.deleteEventTimeTimer(time); + } + + @Override + public S getPartitionedState(StateDescriptor stateDescriptor) { + throw new UnsupportedOperationException( + "Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs."); + } + + public static Trigger.TriggerContext of( + AsyncTrigger.TriggerContext asyncTriggerContext) { + return new AsyncTriggerContextConvertor(asyncTriggerContext); + } + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java index ee1b64d07bd2b..cdfa7ad21725a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java @@ -28,15 +28,11 @@ import org.apache.flink.api.common.state.AppendingState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.core.state.StateFutureUtils; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncEvictingWindowOperator; import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator; import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAggregateProcessAsyncWindowFunction; @@ -45,10 +41,6 @@ import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableProcessAsyncWindowFunction; import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueAsyncWindowFunction; import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueProcessAsyncWindowFunction; -import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger; -import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger; -import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger; -import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger; import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; @@ -56,17 +48,10 @@ import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.EndOfStreamTrigger; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; -import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; -import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext; -import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; @@ -570,130 +555,4 @@ public String generateOperatorDescription(Function function1, @Nullable Function public long getAllowedLateness() { return allowedLateness; } - - private static class UserDefinedAsyncTrigger extends AsyncTrigger { - private final Trigger userDefinedTrigger; - - private UserDefinedAsyncTrigger(Trigger userDefinedTrigger) { - this.userDefinedTrigger = userDefinedTrigger; - } - - @Override - public StateFuture onElement( - T element, long timestamp, W window, TriggerContext ctx) throws Exception { - return StateFutureUtils.completedFuture( - userDefinedTrigger.onElement( - element, timestamp, window, AsyncTriggerContextConvertor.of(ctx))); - } - - @Override - public StateFuture onProcessingTime(long time, W window, TriggerContext ctx) - throws Exception { - return StateFutureUtils.completedFuture( - userDefinedTrigger.onProcessingTime( - time, window, AsyncTriggerContextConvertor.of(ctx))); - } - - @Override - public StateFuture onEventTime(long time, W window, TriggerContext ctx) - throws Exception { - return StateFutureUtils.completedFuture( - userDefinedTrigger.onEventTime( - time, window, AsyncTriggerContextConvertor.of(ctx))); - } - - @Override - public StateFuture clear(W window, TriggerContext ctx) throws Exception { - userDefinedTrigger.clear(window, AsyncTriggerContextConvertor.of(ctx)); - return StateFutureUtils.completedVoidFuture(); - } - - @Override - public boolean isEndOfStreamTrigger() { - return userDefinedTrigger instanceof EndOfStreamTrigger; - } - - public static AsyncTrigger of( - Trigger userDefinedTrigger) { - return new UserDefinedAsyncTrigger<>(userDefinedTrigger); - } - } - - private static class AsyncTriggerConverter { - - @SuppressWarnings("unchecked") - public static AsyncTrigger convertToAsync( - Trigger trigger) { - if (trigger instanceof CountTrigger) { - return (AsyncTrigger) - AsyncCountTrigger.of(((CountTrigger) trigger).getMaxCount()); - } else if (trigger instanceof EventTimeTrigger) { - return (AsyncTrigger) AsyncEventTimeTrigger.create(); - } else if (trigger instanceof ProcessingTimeTrigger) { - return (AsyncTrigger) AsyncProcessingTimeTrigger.create(); - } else if (trigger instanceof PurgingTrigger) { - return (AsyncTrigger) - AsyncPurgingTrigger.of( - convertToAsync( - ((PurgingTrigger) trigger).getNestedTrigger())); - } else { - return UserDefinedAsyncTrigger.of(trigger); - } - } - } - - /** A converter from {@link AsyncTrigger.TriggerContext} to {@link Trigger.TriggerContext}. */ - private static class AsyncTriggerContextConvertor implements TriggerContext { - - private final AsyncTrigger.TriggerContext asyncTriggerContext; - - private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext asyncTriggerContext) { - this.asyncTriggerContext = asyncTriggerContext; - } - - @Override - public long getCurrentProcessingTime() { - return asyncTriggerContext.getCurrentProcessingTime(); - } - - @Override - public MetricGroup getMetricGroup() { - return asyncTriggerContext.getMetricGroup(); - } - - @Override - public long getCurrentWatermark() { - return asyncTriggerContext.getCurrentWatermark(); - } - - @Override - public void registerProcessingTimeTimer(long time) { - asyncTriggerContext.registerProcessingTimeTimer(time); - } - - @Override - public void registerEventTimeTimer(long time) { - asyncTriggerContext.registerEventTimeTimer(time); - } - - @Override - public void deleteProcessingTimeTimer(long time) { - asyncTriggerContext.deleteProcessingTimeTimer(time); - } - - @Override - public void deleteEventTimeTimer(long time) { - asyncTriggerContext.deleteEventTimeTimer(time); - } - - @Override - public S getPartitionedState(StateDescriptor stateDescriptor) { - throw new UnsupportedOperationException( - "Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs."); - } - - public static TriggerContext of(AsyncTrigger.TriggerContext asyncTriggerContext) { - return new AsyncTriggerContextConvertor(asyncTriggerContext); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java new file mode 100644 index 0000000000000..dbeada9aa04ef --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.asyncprocessing.InternalAsyncFuture; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TestInternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.stream.Collectors; + +/** Utility for testing {@link AsyncTrigger} behaviour. */ +public class AsyncTriggerTestHarness extends TriggerTestHarness { + + private final AsyncTrigger trigger; + + // Async adaptor to realStateBackend. + private final AsyncKeyedStateBackend asyncStateBackend; + + /** + * Initialize test harness for async trigger. + * + *

The state backend is heap, which does not support async state operation. The tests use + * async state API, but all state operations execute in sync mode. + */ + public AsyncTriggerTestHarness(AsyncTrigger trigger, TypeSerializer windowSerializer) + throws Exception { + super(null, windowSerializer); + this.trigger = trigger; + + this.asyncStateBackend = new AsyncKeyedStateBackendAdaptor<>(stateBackend); + } + + // ------------------------------------------------------------------------------ + // Override TriggerTestHarness API + // ------------------------------------------------------------------------------ + + @Override + public TriggerResult processElement(StreamRecord element, W window) throws Exception { + return completeStateFuture(asyncProcessElement(element, window)); + } + + @Override + public TriggerResult advanceProcessingTime(long time, W window) throws Exception { + return completeStateFuture(asyncAdvanceProcessingTime(time, window)); + } + + @Override + public TriggerResult advanceWatermark(long time, W window) throws Exception { + return completeStateFuture(asyncAdvanceWatermark(time, window)); + } + + @Override + public Collection> advanceProcessingTime(long time) throws Exception { + return asyncAdvanceProcessingTime(time).stream() + .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1))) + .collect(Collectors.toList()); + } + + @Override + public Collection> advanceWatermark(long time) throws Exception { + return asyncAdvanceWatermark(time).stream() + .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1))) + .collect(Collectors.toList()); + } + + @Override + public TriggerResult invokeOnEventTime(long timestamp, W window) throws Exception { + return completeStateFuture(asyncInvokeOnEventTime(timestamp, window)); + } + + @Override + public void mergeWindows(W targetWindow, Collection mergedWindows) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void clearTriggerState(W window) throws Exception { + completeStateFuture(asyncClearTriggerState(window)); + } + + // ------------------------------------------------------------------------------ + // Using Async State API + // ------------------------------------------------------------------------------ + + StateFuture asyncProcessElement(StreamRecord element, W window) + throws Exception { + TestTriggerContext triggerContext = + new TestTriggerContext<>( + KEY, window, internalTimerService, asyncStateBackend, windowSerializer); + return trigger.onElement( + element.getValue(), element.getTimestamp(), window, triggerContext); + } + + StateFuture asyncAdvanceProcessingTime(long time, W window) throws Exception { + Collection>> firings = + asyncAdvanceProcessingTime(time); + + if (firings.size() != 1) { + throw new IllegalStateException( + "Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2> firing = firings.iterator().next(); + + if (!firing.f0.equals(window)) { + throw new IllegalStateException("Trigger fired for another window."); + } + + return firing.f1; + } + + StateFuture asyncAdvanceWatermark(long time, W window) throws Exception { + Collection>> firings = asyncAdvanceWatermark(time); + + if (firings.size() != 1) { + throw new IllegalStateException( + "Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2> firing = firings.iterator().next(); + + if (!firing.f0.equals(window)) { + throw new IllegalStateException("Trigger fired for another window."); + } + + return firing.f1; + } + + Collection>> asyncAdvanceProcessingTime(long time) + throws Exception { + Collection> firedTimers = + internalTimerService.advanceProcessingTime(time); + + Collection>> result = new ArrayList<>(); + + for (TestInternalTimerService.Timer timer : firedTimers) { + TestTriggerContext triggerContext = + new TestTriggerContext<>( + KEY, + timer.getNamespace(), + internalTimerService, + asyncStateBackend, + windowSerializer); + + StateFuture triggerResult = + trigger.onProcessingTime( + timer.getTimestamp(), timer.getNamespace(), triggerContext); + + result.add(new Tuple2<>(timer.getNamespace(), triggerResult)); + } + + return result; + } + + Collection>> asyncAdvanceWatermark(long time) + throws Exception { + Collection> firedTimers = + internalTimerService.advanceWatermark(time); + + Collection>> result = new ArrayList<>(); + + for (TestInternalTimerService.Timer timer : firedTimers) { + StateFuture triggerResult = asyncInvokeOnEventTime(timer); + result.add(new Tuple2<>(timer.getNamespace(), triggerResult)); + } + + return result; + } + + private StateFuture asyncInvokeOnEventTime( + TestInternalTimerService.Timer timer) throws Exception { + TestTriggerContext triggerContext = + new TestTriggerContext<>( + KEY, + timer.getNamespace(), + internalTimerService, + asyncStateBackend, + windowSerializer); + + return trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext); + } + + StateFuture asyncInvokeOnEventTime(long timestamp, W window) throws Exception { + TestInternalTimerService.Timer timer = + new TestInternalTimerService.Timer<>(timestamp, KEY, window); + + return asyncInvokeOnEventTime(timer); + } + + StateFuture asyncClearTriggerState(W window) throws Exception { + TestTriggerContext triggerContext = + new TestTriggerContext<>( + KEY, window, internalTimerService, asyncStateBackend, windowSerializer); + return trigger.clear(window, triggerContext); + } + + // ------------------------------------------------------------------------------ + // Context + // ------------------------------------------------------------------------------ + + private static class TestTriggerContext + implements AsyncTrigger.TriggerContext { + + protected final InternalTimerService timerService; + protected final AsyncKeyedStateBackend stateBackend; + protected final K key; + protected final W window; + protected final TypeSerializer windowSerializer; + + TestTriggerContext( + K key, + W window, + InternalTimerService timerService, + AsyncKeyedStateBackend stateBackend, + TypeSerializer windowSerializer) { + this.key = key; + this.window = window; + this.timerService = timerService; + this.stateBackend = stateBackend; + this.windowSerializer = windowSerializer; + } + + @Override + public long getCurrentProcessingTime() { + return timerService.currentProcessingTime(); + } + + @Override + public MetricGroup getMetricGroup() { + return null; + } + + @Override + public long getCurrentWatermark() { + return timerService.currentWatermark(); + } + + @Override + public void registerProcessingTimeTimer(long time) { + timerService.registerProcessingTimeTimer(window, time); + } + + @Override + public void registerEventTimeTimer(long time) { + timerService.registerEventTimeTimer(window, time); + } + + @Override + public void deleteProcessingTimeTimer(long time) { + timerService.deleteProcessingTimeTimer(window, time); + } + + @Override + public void deleteEventTimeTimer(long time) { + timerService.deleteEventTimeTimer(window, time); + } + + @Override + public S getPartitionedState(StateDescriptor stateDescriptor) { + try { + // single key (KEY), no need declaration. + return stateBackend.getOrCreateKeyedState( + window, windowSerializer, stateDescriptor); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + } + + static T completeStateFuture(StateFuture future) { + InternalAsyncFuture internalAsyncFuture = (InternalAsyncFuture) future; + + // The harness executes state operations in sync mode, any StateFuture should be completed. + Preconditions.checkArgument(internalAsyncFuture.isDone()); + return internalAsyncFuture.get(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java index 02454dd4d6822..9642500bc4806 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java @@ -54,13 +54,13 @@ /** Utility for testing {@link Trigger} behaviour. */ public class TriggerTestHarness { - private static final Integer KEY = 1; + protected static final Integer KEY = 1; private final Trigger trigger; - private final TypeSerializer windowSerializer; + protected final TypeSerializer windowSerializer; - private final HeapKeyedStateBackend stateBackend; - private final TestInternalTimerService internalTimerService; + protected final HeapKeyedStateBackend stateBackend; + protected final TestInternalTimerService internalTimerService; public TriggerTestHarness(Trigger trigger, TypeSerializer windowSerializer) throws Exception { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java new file mode 100644 index 0000000000000..ea602cb172fc3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.time.Duration; +import java.util.Objects; + +/** + * A {@link AsyncTrigger} that can turn any {@link AsyncTrigger} into a timeout {@code + * AsyncTrigger}. + * + *

On the first arriving element a configurable processing-time timeout will be set. Using {@link + * #of(AsyncTrigger, Duration, boolean, boolean)}, you can also re-new the timer for each arriving + * element by specifying {@code resetTimerOnNewRecord} and you can specify whether {@link + * AsyncTrigger#clear(Window, AsyncTrigger.TriggerContext)} should be called on timout via {@code + * shouldClearOnTimeout}. + * + * @param The type of elements on which this trigger can operate. + * @param The type of {@link Window} on which this trigger can operate. + */ +@Experimental +public class AsyncProcessingTimeoutTrigger extends AsyncTrigger { + private static final long serialVersionUID = 1L; + + private final AsyncTrigger nestedTrigger; + private final long interval; + private final boolean resetTimerOnNewRecord; + private final boolean shouldClearOnTimeout; + + private final ValueStateDescriptor timeoutStateDesc; + + public AsyncProcessingTimeoutTrigger( + AsyncTrigger nestedTrigger, + long interval, + boolean resetTimerOnNewRecord, + boolean shouldClearOnTimeout) { + this.nestedTrigger = nestedTrigger; + this.interval = interval; + this.resetTimerOnNewRecord = resetTimerOnNewRecord; + this.shouldClearOnTimeout = shouldClearOnTimeout; + + this.timeoutStateDesc = new ValueStateDescriptor<>("timeout", LongSerializer.INSTANCE); + } + + @Override + public StateFuture onElement( + T element, long timestamp, W window, TriggerContext ctx) throws Exception { + return this.nestedTrigger + .onElement(element, timestamp, window, ctx) + .thenConditionallyCompose( + TriggerResult::isFire, + triggerResult -> this.clear(window, ctx).thenApply(ignore -> triggerResult), + triggerResult -> { + ValueState timeoutState = + ctx.getPartitionedState(this.timeoutStateDesc); + long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval; + + return timeoutState + .asyncValue() + .thenConditionallyCompose( + Objects::nonNull, + timeoutTimestamp -> { + if (resetTimerOnNewRecord) { + ctx.deleteProcessingTimeTimer(timeoutTimestamp); + return timeoutState + .asyncClear() + .thenApply(ignore -> null); + } else { + return StateFutureUtils.completedFuture( + timeoutTimestamp); + } + }) + .thenConditionallyCompose( + tuple -> tuple.f1 /*timeoutTimestamp*/ == null, + ignore -> + timeoutState + .asyncUpdate(nextFireTimestamp) + .thenAccept( + ignore2 -> + ctx + .registerProcessingTimeTimer( + nextFireTimestamp))) + .thenApply(ignore -> triggerResult); + }) + .thenApply(tuple -> (TriggerResult) tuple.f1); + } + + @Override + public StateFuture onProcessingTime(long time, W window, TriggerContext ctx) + throws Exception { + return this.nestedTrigger + .onProcessingTime(time, window, ctx) + .thenCompose( + triggerResult -> { + TriggerResult finalResult = + triggerResult.isPurge() + ? TriggerResult.FIRE_AND_PURGE + : TriggerResult.FIRE; + return shouldClearOnTimeout + ? this.clear(window, ctx).thenApply(ignore -> finalResult) + : StateFutureUtils.completedFuture(finalResult); + }); + } + + @Override + public StateFuture onEventTime(long time, W window, TriggerContext ctx) + throws Exception { + return this.nestedTrigger + .onEventTime(time, window, ctx) + .thenCompose( + triggerResult -> { + TriggerResult finalResult = + triggerResult.isPurge() + ? TriggerResult.FIRE_AND_PURGE + : TriggerResult.FIRE; + return shouldClearOnTimeout + ? this.clear(window, ctx).thenApply(ignore -> finalResult) + : StateFutureUtils.completedFuture(finalResult); + }); + } + + @Override + public StateFuture clear(W window, TriggerContext ctx) throws Exception { + ValueState timeoutTimestampState = ctx.getPartitionedState(this.timeoutStateDesc); + return timeoutTimestampState + .asyncValue() + .thenConditionallyCompose( + Objects::nonNull, + timeoutTimestamp -> { + ctx.deleteProcessingTimeTimer(timeoutTimestamp); + return timeoutTimestampState.asyncClear(); + }) + .thenCompose(ignore -> this.nestedTrigger.clear(window, ctx)); + } + + @Override + public String toString() { + return "AsyncTimeoutTrigger(" + this.nestedTrigger.toString() + ")"; + } + + /** + * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the inner trigger is + * fired or when the timeout timer fires. + * + *

For example: {@code AsyncProcessingTimeoutTrigger.of(AsyncCountTrigger.of(3), 100)}, will + * create a AsyncCountTrigger with timeout of 100 millis. So, if the first record arrives at + * time {@code t}, and the second record arrives at time {@code t+50 }, the trigger will fire + * when the third record arrives or when the time is {code t+100} (timeout). + * + * @param nestedTrigger the nested {@link AsyncTrigger} + * @param timeout the timeout interval + * @return {@link AsyncProcessingTimeoutTrigger} with the above configuration. + */ + public static AsyncProcessingTimeoutTrigger of( + AsyncTrigger nestedTrigger, Duration timeout) { + return new AsyncProcessingTimeoutTrigger<>(nestedTrigger, timeout.toMillis(), false, true); + } + + /** + * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the inner trigger is + * fired or when the timeout timer fires. + * + *

For example: {@code AsyncProcessingTimeoutTrigger.of(CountTrigger.of(3), 100, false, + * true)}, will create a AsyncCountTrigger with timeout of 100 millis. So, if the first record + * arrives at time {@code t}, and the second record arrives at time {@code t+50 }, the trigger + * will fire when the third record arrives or when the time is {code t+100} (timeout). + * + * @param nestedTrigger the nested {@link AsyncTrigger} + * @param timeout the timeout interval + * @param resetTimerOnNewRecord each time a new element arrives, reset the timer and start a new + * one + * @param shouldClearOnTimeout whether to call {@link AsyncTrigger#clear(Window, + * AsyncTrigger.TriggerContext)} when the processing-time timer fires + * @param The type of the element. + * @param The type of {@link Window Windows} on which this trigger can operate. + * @return {@link AsyncProcessingTimeoutTrigger} with the above configuration. + */ + public static AsyncProcessingTimeoutTrigger of( + AsyncTrigger nestedTrigger, + Duration timeout, + boolean resetTimerOnNewRecord, + boolean shouldClearOnTimeout) { + return new AsyncProcessingTimeoutTrigger<>( + nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout); + } + + @VisibleForTesting + public AsyncTrigger getNestedTrigger() { + return nestedTrigger; + } + + @VisibleForTesting + public long getInterval() { + return interval; + } + + @VisibleForTesting + public boolean isResetTimerOnNewRecord() { + return resetTimerOnNewRecord; + } + + @VisibleForTesting + public boolean isShouldClearOnTimeout() { + return shouldClearOnTimeout; + } + + @VisibleForTesting + public ValueStateDescriptor getTimeoutStateDesc() { + return timeoutStateDesc; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java index 268edfbf14b14..a32eac2b35c89 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java @@ -22,7 +22,12 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConvertable; +import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConverter; + +import javax.annotation.Nonnull; import java.time.Duration; @@ -39,7 +44,8 @@ * @param The type of {@link Window} on which this trigger can operate. */ @PublicEvolving -public class ProcessingTimeoutTrigger extends Trigger { +public class ProcessingTimeoutTrigger extends Trigger + implements AsyncTriggerConvertable { private static final long serialVersionUID = 1L; @@ -169,4 +175,14 @@ public static ProcessingTimeoutTrigger of( return new ProcessingTimeoutTrigger<>( nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout); } + + @Override + @Nonnull + public AsyncTrigger convertToAsync() { + return AsyncProcessingTimeoutTrigger.of( + AsyncTriggerConverter.convertToAsync(this.nestedTrigger), + Duration.ofMillis(interval), + resetTimerOnNewRecord, + shouldClearOnTimeout); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java index 99a2593ab78b7..02c24018cc612 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java @@ -18,15 +18,21 @@ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; +import org.apache.flink.streaming.api.windowing.triggers.AsyncProcessingTimeoutTrigger; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeoutTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; @@ -35,14 +41,17 @@ /** Tests for {@link ProcessingTimeoutTrigger}. */ class ProcessingTimeoutTriggerTest { - - @Test - void testWindowFireWithoutResetTimer() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void testWindowFireWithoutResetTimer(boolean enableAsyncState) throws Exception { + Trigger trigger = + ProcessingTimeoutTrigger.of(CountTrigger.of(3), Duration.ofMillis(50), false, true); TriggerTestHarness testHarness = - new TriggerTestHarness<>( - ProcessingTimeoutTrigger.of( - CountTrigger.of(3), Duration.ofMillis(50), false, true), - new TimeWindow.Serializer()); + enableAsyncState + ? new AsyncTriggerTestHarness<>( + AsyncTriggerConverter.convertToAsync(trigger), + new TimeWindow.Serializer()) + : new TriggerTestHarness<>(trigger, new TimeWindow.Serializer()); assertThat(testHarness.processElement(new StreamRecord<>(1), new TimeWindow(0, 2))) .isEqualTo(TriggerResult.CONTINUE); @@ -76,13 +85,17 @@ void testWindowFireWithoutResetTimer() throws Exception { .isEqualTo(TriggerResult.FIRE); } - @Test - void testWindowFireWithResetTimer() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void testWindowFireWithResetTimer(boolean enableAsyncState) throws Exception { + Trigger trigger = + ProcessingTimeoutTrigger.of(CountTrigger.of(3), Duration.ofMillis(50), true, true); TriggerTestHarness testHarness = - new TriggerTestHarness<>( - ProcessingTimeoutTrigger.of( - CountTrigger.of(3), Duration.ofMillis(50), true, true), - new TimeWindow.Serializer()); + enableAsyncState + ? new AsyncTriggerTestHarness<>( + AsyncTriggerConverter.convertToAsync(trigger), + new TimeWindow.Serializer()) + : new TriggerTestHarness<>(trigger, new TimeWindow.Serializer()); assertThrows( "Must have exactly one timer firing. Fired timers: []", @@ -125,13 +138,18 @@ void testWindowFireWithResetTimer() throws Exception { .isEqualTo(TriggerResult.FIRE); } - @Test - void testWindowFireWithoutClearOnTimeout() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void testWindowFireWithoutClearOnTimeout(boolean enableAsyncState) throws Exception { + Trigger trigger = + ProcessingTimeoutTrigger.of( + CountTrigger.of(3), Duration.ofMillis(50), false, false); TriggerTestHarness testHarness = - new TriggerTestHarness<>( - ProcessingTimeoutTrigger.of( - CountTrigger.of(3), Duration.ofMillis(50), false, false), - new TimeWindow.Serializer()); + enableAsyncState + ? new AsyncTriggerTestHarness<>( + AsyncTriggerConverter.convertToAsync(trigger), + new TimeWindow.Serializer()) + : new TriggerTestHarness<>(trigger, new TimeWindow.Serializer()); assertThat(testHarness.processElement(new StreamRecord<>(1), new TimeWindow(0, 2))) .isEqualTo(TriggerResult.CONTINUE); @@ -153,16 +171,21 @@ void testWindowFireWithoutClearOnTimeout() throws Exception { assertThat(testHarness.numEventTimeTimers()).isZero(); } - @Test - void testWindowPurgingWhenInnerTriggerIsPurging() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void testWindowPurgingWhenInnerTriggerIsPurging(boolean enableAsyncState) throws Exception { + Trigger trigger = + ProcessingTimeoutTrigger.of( + PurgingTrigger.of(ProcessingTimeTrigger.create()), + Duration.ofMillis(50), + false, + false); TriggerTestHarness testHarness = - new TriggerTestHarness<>( - ProcessingTimeoutTrigger.of( - PurgingTrigger.of(ProcessingTimeTrigger.create()), - Duration.ofMillis(50), - false, - false), - new TimeWindow.Serializer()); + enableAsyncState + ? new AsyncTriggerTestHarness<>( + AsyncTriggerConverter.convertToAsync(trigger), + new TimeWindow.Serializer()) + : new TriggerTestHarness<>(trigger, new TimeWindow.Serializer()); assertThat(testHarness.processElement(new StreamRecord<>(1), new TimeWindow(0, 2))) .isEqualTo(TriggerResult.CONTINUE); @@ -185,4 +208,25 @@ void testWindowPurgingWhenInnerTriggerIsPurging() throws Exception { assertThat(testHarness.numProcessingTimeTimers()).isOne(); assertThat(testHarness.numEventTimeTimers()).isZero(); } + + @Test + void testConvertToAsync() { + Trigger syncTrigger = + ProcessingTimeoutTrigger.of( + CountTrigger.of(2333), Duration.ofMillis(233), false, false); + + AsyncTrigger asyncTrigger = + AsyncTriggerConverter.convertToAsync(syncTrigger); + assertThat(asyncTrigger).isInstanceOf(AsyncProcessingTimeoutTrigger.class); + AsyncProcessingTimeoutTrigger asyncProcessingTimeoutTrigger = + (AsyncProcessingTimeoutTrigger) asyncTrigger; + assertThat(asyncProcessingTimeoutTrigger.getInterval()).isEqualTo(233); + assertThat(asyncProcessingTimeoutTrigger.isResetTimerOnNewRecord()).isFalse(); + assertThat(asyncProcessingTimeoutTrigger.isShouldClearOnTimeout()).isFalse(); + + AsyncTrigger nestedTrigger = + asyncProcessingTimeoutTrigger.getNestedTrigger(); + assertThat(nestedTrigger).isInstanceOf(AsyncCountTrigger.class); + assertThat(nestedTrigger.toString()).isEqualTo("AsyncCountTrigger(2333)"); + } } From 2ac81c1845d1faa7b6df3a127981e17980825952 Mon Sep 17 00:00:00 2001 From: xiarui Date: Thu, 25 Sep 2025 16:36:32 +0800 Subject: [PATCH 2/4] Fix annotation test failure --- .../operators/windowing/AsyncTriggerConvertable.java | 12 ++++++------ .../operators/windowing/AsyncTriggerConverter.java | 2 +- .../windowing/triggers/ProcessingTimeoutTrigger.java | 5 ++--- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java index ec7474ae42b53..9f371424c305e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java @@ -18,9 +18,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; -import org.apache.flink.annotation.Experimental; -import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; -import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.annotation.Internal; import javax.annotation.Nonnull; @@ -28,13 +26,15 @@ * Interface for declaring the ability to convert a sync {@code Trigger} to {@code AsyncTrigger}. * The trigger conversion happens in {@code AsyncTriggerConverter}. */ -@Experimental -public interface AsyncTriggerConvertable { +@Internal +public interface AsyncTriggerConvertable { /** * Convert to an {@code AsyncTrigger}. * + *

TODO: Return {@code AsyncTrigger} if {@code AsyncTrigger} becomes @PublicEvolving. + * * @return The {@code AsyncTrigger} for async state processing. */ @Nonnull - AsyncTrigger convertToAsync(); + Object convertToAsync(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java index dc0d2c8f07f35..63bee07807d9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java @@ -64,7 +64,7 @@ public static AsyncTrigger convertToAsync(Trigger) trigger).getNestedTrigger())); } else if (trigger instanceof AsyncTriggerConvertable) { - return ((AsyncTriggerConvertable) trigger).convertToAsync(); + return (AsyncTrigger) ((AsyncTriggerConvertable) trigger).convertToAsync(); } else { return UserDefinedAsyncTrigger.of(trigger); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java index a32eac2b35c89..1a04b19901c36 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConvertable; import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConverter; @@ -45,7 +44,7 @@ */ @PublicEvolving public class ProcessingTimeoutTrigger extends Trigger - implements AsyncTriggerConvertable { + implements AsyncTriggerConvertable { private static final long serialVersionUID = 1L; @@ -178,7 +177,7 @@ public static ProcessingTimeoutTrigger of( @Override @Nonnull - public AsyncTrigger convertToAsync() { + public Object convertToAsync() { return AsyncProcessingTimeoutTrigger.of( AsyncTriggerConverter.convertToAsync(this.nestedTrigger), Duration.ofMillis(interval), From a4c1290952cb8048175d2ea4f1b42a5c3dbaa701 Mon Sep 17 00:00:00 2001 From: xiarui Date: Fri, 26 Sep 2025 15:07:14 +0800 Subject: [PATCH 3/4] Refine API in AsyncTriggerConverter --- .../windowing/AsyncTriggerConvertable.java | 40 ------ .../windowing/AsyncTriggerConverter.java | 33 ++++- .../triggers/ProcessingTimeoutTrigger.java | 3 +- .../windowing/AsyncTriggerConverterTest.java | 114 ++++++++++++++++++ 4 files changed, 143 insertions(+), 47 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java deleted file mode 100644 index 9f371424c305e..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConvertable.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.annotation.Internal; - -import javax.annotation.Nonnull; - -/** - * Interface for declaring the ability to convert a sync {@code Trigger} to {@code AsyncTrigger}. - * The trigger conversion happens in {@code AsyncTriggerConverter}. - */ -@Internal -public interface AsyncTriggerConvertable { - /** - * Convert to an {@code AsyncTrigger}. - * - *

TODO: Return {@code AsyncTrigger} if {@code AsyncTrigger} becomes @PublicEvolving. - * - * @return The {@code AsyncTrigger} for async state processing. - */ - @Nonnull - Object convertToAsync(); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java index 63bee07807d9c..7e0e8955405c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; @@ -37,6 +39,8 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; +import javax.annotation.Nonnull; + /** * A converter from {@code Trigger} to {@code AsyncTrigger}. * @@ -48,10 +52,24 @@ *

Other triggers are wrapped as an {@code AsyncTrigger}, whose internal functions are executed * in sync mode. */ -public class AsyncTriggerConverter { +@Internal +public interface AsyncTriggerConverter { + + /** + * Convert to an {@code AsyncTrigger}. The default implementation is only a wrapper of the + * trigger, whose behaviours are all sync. + * + *

TODO: Return {@code AsyncTrigger} if {@code AsyncTrigger} becomes @PublicEvolving. + * + * @return The {@code AsyncTrigger} for async state processing. + */ + @Nonnull + default Object convertToAsync() { + return UserDefinedAsyncTrigger.of((Trigger) AsyncTriggerConverter.this); + } @SuppressWarnings("unchecked") - public static AsyncTrigger convertToAsync(Trigger trigger) { + static AsyncTrigger convertToAsync(Trigger trigger) { if (trigger instanceof CountTrigger) { return (AsyncTrigger) AsyncCountTrigger.of(((CountTrigger) trigger).getMaxCount()); @@ -63,15 +81,15 @@ public static AsyncTrigger convertToAsync(Trigger) AsyncPurgingTrigger.of( convertToAsync(((PurgingTrigger) trigger).getNestedTrigger())); - } else if (trigger instanceof AsyncTriggerConvertable) { - return (AsyncTrigger) ((AsyncTriggerConvertable) trigger).convertToAsync(); + } else if (trigger instanceof AsyncTriggerConverter) { + return (AsyncTrigger) ((AsyncTriggerConverter) trigger).convertToAsync(); } else { return UserDefinedAsyncTrigger.of(trigger); } } /** Convert non-support user-defined trigger to {@code AsyncTrigger}. */ - private static class UserDefinedAsyncTrigger extends AsyncTrigger { + class UserDefinedAsyncTrigger extends AsyncTrigger { private final Trigger userDefinedTrigger; private UserDefinedAsyncTrigger(Trigger userDefinedTrigger) { @@ -175,5 +193,10 @@ public static Trigger.TriggerContext of( return new AsyncTriggerContextConvertor(asyncTriggerContext); } } + + @VisibleForTesting + public Trigger getUserDefinedTrigger() { + return userDefinedTrigger; + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java index 1a04b19901c36..7d972e8864374 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConvertable; import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConverter; import javax.annotation.Nonnull; @@ -44,7 +43,7 @@ */ @PublicEvolving public class ProcessingTimeoutTrigger extends Trigger - implements AsyncTriggerConvertable { + implements AsyncTriggerConverter { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java new file mode 100644 index 0000000000000..02c943a90f43c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@code AsyncTriggerConverter}. */ +public class AsyncTriggerConverterTest { + private static class DummyTriggerWithoutAsyncConverter extends Trigger + implements AsyncTriggerConverter { + @Override + public TriggerResult onElement( + Object element, long timestamp, TimeWindow window, TriggerContext ctx) + throws Exception { + return null; + } + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) + throws Exception { + return null; + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) + throws Exception { + return null; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} + } + + private static class DummyTriggerWithAsyncConverter extends DummyTriggerWithoutAsyncConverter { + @Override + @NotNull + public Object convertToAsync() { + return new DummyAsyncTrigger(); + } + } + + private static class DummyAsyncTrigger extends AsyncTrigger { + @Override + public StateFuture onElement( + Object element, long timestamp, TimeWindow window, TriggerContext ctx) + throws Exception { + return null; + } + + @Override + public StateFuture onProcessingTime( + long time, TimeWindow window, TriggerContext ctx) throws Exception { + return null; + } + + @Override + public StateFuture onEventTime( + long time, TimeWindow window, TriggerContext ctx) throws Exception { + return null; + } + + @Override + public StateFuture clear(TimeWindow window, TriggerContext ctx) throws Exception { + return null; + } + } + + @Test + void testTriggerUseDefaultConvert() { + Trigger syncTrigger = new DummyTriggerWithoutAsyncConverter(); + AsyncTrigger asyncTrigger = + AsyncTriggerConverter.convertToAsync(syncTrigger); + + assertThat(asyncTrigger).isInstanceOf(AsyncTriggerConverter.UserDefinedAsyncTrigger.class); + AsyncTriggerConverter.UserDefinedAsyncTrigger triggerWrapper = + (AsyncTriggerConverter.UserDefinedAsyncTrigger) asyncTrigger; + + assertThat(triggerWrapper.getUserDefinedTrigger()).isSameAs(syncTrigger); + } + + @Test + void testTriggerUseCustomizeConvert() { + Trigger syncTrigger = new DummyTriggerWithAsyncConverter(); + AsyncTrigger asyncTrigger = + AsyncTriggerConverter.convertToAsync(syncTrigger); + + assertThat(asyncTrigger).isInstanceOf(DummyAsyncTrigger.class); + } +} From 09a62f9f27dcdbc118cf57ccc861fca9304d7a5c Mon Sep 17 00:00:00 2001 From: xiarui Date: Fri, 26 Sep 2025 21:46:07 +0800 Subject: [PATCH 4/4] Fix checkstyle --- .../operators/windowing/AsyncTriggerConverterTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java index 02c943a90f43c..cf1fe75400b37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java @@ -24,9 +24,10 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; +import javax.annotation.Nonnull; + import static org.assertj.core.api.Assertions.assertThat; /** Test for {@code AsyncTriggerConverter}. */ @@ -58,7 +59,7 @@ public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} private static class DummyTriggerWithAsyncConverter extends DummyTriggerWithoutAsyncConverter { @Override - @NotNull + @Nonnull public Object convertToAsync() { return new DummyAsyncTrigger(); }