Skip to content

Commit e4cd5ed

Browse files
committed
[FLINK-38364][runtime] Implement async state version of ProcessingTimeoutTrigger
1 parent 335621e commit e4cd5ed

File tree

4 files changed

+608
-33
lines changed

4 files changed

+608
-33
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.common.state.v2.StateFuture;
23+
import org.apache.flink.api.common.state.v2.ValueState;
24+
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
25+
import org.apache.flink.api.common.typeutils.base.LongSerializer;
26+
import org.apache.flink.core.state.StateFutureUtils;
27+
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
28+
import org.apache.flink.streaming.api.windowing.windows.Window;
29+
30+
import java.time.Duration;
31+
import java.util.Objects;
32+
33+
/**
34+
* A {@link AsyncTrigger} that can turn any {@link AsyncTrigger} into a timeout {@code
35+
* AsyncTrigger}.
36+
*
37+
* <p>On the first arriving element a configurable processing-time timeout will be set. Using {@link
38+
* #of(AsyncTrigger, Duration, boolean, boolean)}, you can also re-new the timer for each arriving
39+
* element by specifying {@code resetTimerOnNewRecord} and you can specify whether {@link
40+
* AsyncTrigger#clear(Window, AsyncTrigger.TriggerContext)} should be called on timout via {@code
41+
* shouldClearOnTimeout}.
42+
*
43+
* @param <T> The type of elements on which this trigger can operate.
44+
* @param <W> The type of {@link Window} on which this trigger can operate.
45+
*/
46+
@Experimental
47+
public class AsyncProcessingTimeoutTrigger<T, W extends Window> extends AsyncTrigger<T, W> {
48+
private static final long serialVersionUID = 1L;
49+
50+
private final AsyncTrigger<T, W> nestedTrigger;
51+
private final long interval;
52+
private final boolean resetTimerOnNewRecord;
53+
private final boolean shouldClearOnTimeout;
54+
55+
private final ValueStateDescriptor<Long> timeoutStateDesc;
56+
57+
public AsyncProcessingTimeoutTrigger(
58+
AsyncTrigger<T, W> nestedTrigger,
59+
long interval,
60+
boolean resetTimerOnNewRecord,
61+
boolean shouldClearOnTimeout) {
62+
this.nestedTrigger = nestedTrigger;
63+
this.interval = interval;
64+
this.resetTimerOnNewRecord = resetTimerOnNewRecord;
65+
this.shouldClearOnTimeout = shouldClearOnTimeout;
66+
67+
this.timeoutStateDesc = new ValueStateDescriptor<>("timeout", LongSerializer.INSTANCE);
68+
}
69+
70+
@Override
71+
public StateFuture<TriggerResult> onElement(
72+
T element, long timestamp, W window, TriggerContext ctx) throws Exception {
73+
return this.nestedTrigger
74+
.onElement(element, timestamp, window, ctx)
75+
.thenConditionallyCompose(
76+
TriggerResult::isFire,
77+
triggerResult -> this.clear(window, ctx).thenApply(ignore -> triggerResult),
78+
triggerResult -> {
79+
ValueState<Long> timeoutState =
80+
ctx.getPartitionedState(this.timeoutStateDesc);
81+
long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
82+
83+
return timeoutState
84+
.asyncValue()
85+
.thenConditionallyCompose(
86+
Objects::nonNull,
87+
timeoutTimestamp -> {
88+
if (resetTimerOnNewRecord) {
89+
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
90+
return timeoutState
91+
.asyncClear()
92+
.thenApply(ignore -> null);
93+
} else {
94+
return StateFutureUtils.completedFuture(
95+
timeoutTimestamp);
96+
}
97+
})
98+
.thenConditionallyCompose(
99+
tuple -> tuple.f1 /*timeoutTimestamp*/ == null,
100+
ignore ->
101+
timeoutState
102+
.asyncUpdate(nextFireTimestamp)
103+
.thenAccept(
104+
ignore2 ->
105+
ctx
106+
.registerProcessingTimeTimer(
107+
nextFireTimestamp)))
108+
.thenApply(ignore -> triggerResult);
109+
})
110+
.thenApply(tuple -> (TriggerResult) tuple.f1);
111+
}
112+
113+
@Override
114+
public StateFuture<TriggerResult> onProcessingTime(long time, W window, TriggerContext ctx)
115+
throws Exception {
116+
return this.nestedTrigger
117+
.onProcessingTime(time, window, ctx)
118+
.thenCompose(
119+
triggerResult -> {
120+
TriggerResult finalResult =
121+
triggerResult.isPurge()
122+
? TriggerResult.FIRE_AND_PURGE
123+
: TriggerResult.FIRE;
124+
return shouldClearOnTimeout
125+
? this.clear(window, ctx).thenApply(ignore -> finalResult)
126+
: StateFutureUtils.completedFuture(finalResult);
127+
});
128+
}
129+
130+
@Override
131+
public StateFuture<TriggerResult> onEventTime(long time, W window, TriggerContext ctx)
132+
throws Exception {
133+
return this.nestedTrigger
134+
.onEventTime(time, window, ctx)
135+
.thenCompose(
136+
triggerResult -> {
137+
TriggerResult finalResult =
138+
triggerResult.isPurge()
139+
? TriggerResult.FIRE_AND_PURGE
140+
: TriggerResult.FIRE;
141+
return shouldClearOnTimeout
142+
? this.clear(window, ctx).thenApply(ignore -> finalResult)
143+
: StateFutureUtils.completedFuture(finalResult);
144+
});
145+
}
146+
147+
@Override
148+
public StateFuture<Void> clear(W window, TriggerContext ctx) throws Exception {
149+
ValueState<Long> timeoutTimestampState = ctx.getPartitionedState(this.timeoutStateDesc);
150+
return timeoutTimestampState
151+
.asyncValue()
152+
.thenConditionallyCompose(
153+
Objects::nonNull,
154+
timeoutTimestamp -> {
155+
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
156+
return timeoutTimestampState.asyncClear();
157+
})
158+
.thenCompose(ignore -> this.nestedTrigger.clear(window, ctx));
159+
}
160+
161+
@Override
162+
public String toString() {
163+
return "AsyncTimeoutTrigger(" + this.nestedTrigger.toString() + ")";
164+
}
165+
166+
/**
167+
* Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the inner trigger is
168+
* fired or when the timeout timer fires.
169+
*
170+
* <p>For example: {@code AsyncProcessingTimeoutTrigger.of(AsyncCountTrigger.of(3), 100)}, will
171+
* create a AsyncCountTrigger with timeout of 100 millis. So, if the first record arrives at
172+
* time {@code t}, and the second record arrives at time {@code t+50 }, the trigger will fire
173+
* when the third record arrives or when the time is {code t+100} (timeout).
174+
*
175+
* @param nestedTrigger the nested {@link AsyncTrigger}
176+
* @param timeout the timeout interval
177+
* @return {@link AsyncProcessingTimeoutTrigger} with the above configuration.
178+
*/
179+
public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(
180+
AsyncTrigger<T, W> nestedTrigger, Duration timeout) {
181+
return new AsyncProcessingTimeoutTrigger<>(nestedTrigger, timeout.toMillis(), false, true);
182+
}
183+
184+
/**
185+
* Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the inner trigger is
186+
* fired or when the timeout timer fires.
187+
*
188+
* <p>For example: {@code AsyncProcessingTimeoutTrigger.of(CountTrigger.of(3), 100, false,
189+
* true)}, will create a AsyncCountTrigger with timeout of 100 millis. So, if the first record
190+
* arrives at time {@code t}, and the second record arrives at time {@code t+50 }, the trigger
191+
* will fire when the third record arrives or when the time is {code t+100} (timeout).
192+
*
193+
* @param nestedTrigger the nested {@link AsyncTrigger}
194+
* @param timeout the timeout interval
195+
* @param resetTimerOnNewRecord each time a new element arrives, reset the timer and start a new
196+
* one
197+
* @param shouldClearOnTimeout whether to call {@link AsyncTrigger#clear(Window,
198+
* AsyncTrigger.TriggerContext)} when the processing-time timer fires
199+
* @param <T> The type of the element.
200+
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
201+
* @return {@link AsyncProcessingTimeoutTrigger} with the above configuration.
202+
*/
203+
public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(
204+
AsyncTrigger<T, W> nestedTrigger,
205+
Duration timeout,
206+
boolean resetTimerOnNewRecord,
207+
boolean shouldClearOnTimeout) {
208+
return new AsyncProcessingTimeoutTrigger<>(
209+
nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
210+
}
211+
}

flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
/** Utility for testing {@link Trigger} behaviour. */
5555
public class TriggerTestHarness<T, W extends Window> {
5656

57-
private static final Integer KEY = 1;
57+
protected static final Integer KEY = 1;
5858

5959
private final Trigger<T, W> trigger;
60-
private final TypeSerializer<W> windowSerializer;
60+
protected final TypeSerializer<W> windowSerializer;
6161

62-
private final HeapKeyedStateBackend<Integer> stateBackend;
63-
private final TestInternalTimerService<Integer, W> internalTimerService;
62+
protected final HeapKeyedStateBackend<Integer> stateBackend;
63+
protected final TestInternalTimerService<Integer, W> internalTimerService;
6464

6565
public TriggerTestHarness(Trigger<T, W> trigger, TypeSerializer<W> windowSerializer)
6666
throws Exception {

0 commit comments

Comments
 (0)