Skip to content

Commit 39e6712

Browse files
committed
[FLINK-16686][State] Set classloader for compaction filters
1 parent 358a28e commit 39e6712

File tree

7 files changed

+419
-20
lines changed

7 files changed

+419
-20
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,12 @@ private CheckpointStreamFactory createCheckpointStreamFactory() {
8989
}
9090
}
9191

92-
public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
92+
public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) throws IOException {
9393
createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot);
9494
}
9595

96-
void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle snapshot) {
96+
void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle snapshot)
97+
throws IOException {
9798
Collection<KeyedStateHandle> stateHandles;
9899
if (snapshot == null) {
99100
stateHandles = Collections.emptyList();
@@ -102,6 +103,8 @@ void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle s
102103
stateHandles.add(snapshot);
103104
}
104105
env = MockEnvironment.builder().build();
106+
env.setCheckpointStorageAccess(
107+
createCheckpointStorage().createCheckpointStorage(new JobID()));
105108
try {
106109
disposeKeyedStateBackend();
107110
keyedStateBackend =
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.state.ttl;
20+
21+
import org.apache.flink.api.common.serialization.SerializerConfig;
22+
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
23+
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.PipelineOptions;
26+
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
30+
/** Test suite for {@link TtlListState} with elements of serialized by kryo. */
31+
public class TtlListStateWithKryoTestContext
32+
extends TtlListStateTestContextBase<TtlListStateWithKryoTestContext.NotPojoElement> {
33+
TtlListStateWithKryoTestContext() {
34+
super(new KryoSerializer<>(NotPojoElement.class, getForceKryoSerializerConfig()));
35+
}
36+
37+
private static SerializerConfig getForceKryoSerializerConfig() {
38+
Configuration config = new Configuration();
39+
config.set(PipelineOptions.FORCE_KRYO, true);
40+
return new SerializerConfigImpl(config);
41+
}
42+
43+
@Override
44+
NotPojoElement generateRandomElement(int i) {
45+
return new NotPojoElement(RANDOM.nextInt(100));
46+
}
47+
48+
@Override
49+
void initTestValues() {
50+
emptyValue = Collections.emptyList();
51+
52+
updateEmpty =
53+
Arrays.asList(new NotPojoElement(5), new NotPojoElement(7), new NotPojoElement(10));
54+
updateUnexpired =
55+
Arrays.asList(new NotPojoElement(8), new NotPojoElement(9), new NotPojoElement(11));
56+
updateExpired = Arrays.asList(new NotPojoElement(1), new NotPojoElement(4));
57+
58+
getUpdateEmpty = updateEmpty;
59+
getUnexpired = updateUnexpired;
60+
getUpdateExpired = updateExpired;
61+
}
62+
63+
public static class NotPojoElement {
64+
public int value;
65+
66+
public NotPojoElement(int value) {
67+
this.value = value;
68+
}
69+
70+
@Override
71+
public String toString() {
72+
return "NotPojoElement{" + "value=" + value + '}';
73+
}
74+
75+
@Override
76+
public boolean equals(Object obj) {
77+
if (this == obj) {
78+
return true;
79+
}
80+
if (obj == null || getClass() != obj.getClass()) {
81+
return false;
82+
}
83+
NotPojoElement that = (NotPojoElement) obj;
84+
return value == that.value;
85+
}
86+
87+
@Override
88+
public int hashCode() {
89+
return Integer.hashCode(value);
90+
}
91+
}
92+
}

flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ protected abstract StateBackendTestContext createStateBackendTestContext(
8585
new TtlValueStateTestContext(),
8686
new TtlFixedLenElemListStateTestContext(),
8787
new TtlNonFixedLenElemListStateTestContext(),
88+
new TtlListStateWithKryoTestContext(),
8889
new TtlMapStateAllEntriesTestContext(),
8990
new TtlMapStatePerElementTestContext(),
9091
new TtlMapStatePerNullElementTestContext(),

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.io.IOException;
4949
import java.time.Duration;
5050
import java.util.LinkedHashMap;
51+
import java.util.function.Supplier;
5152

5253
/** RocksDB compaction filter utils for state with TTL. */
5354
public class ForStDBTtlCompactFiltersManager {
@@ -205,15 +206,27 @@ public void configCompactFilter(
205206

206207
private static class ListElementFilterFactory<T>
207208
implements FlinkCompactionFilter.ListElementFilterFactory {
208-
private final TypeSerializer<T> serializer;
209+
// {@See #createListElementFilter}.
210+
private final ThreadLocalSerializerProvider<T> threadLocalSerializer;
209211

210212
private ListElementFilterFactory(TypeSerializer<T> serializer) {
211-
this.serializer = serializer;
213+
ClassLoader contextClassLoader = null;
214+
try {
215+
contextClassLoader = Thread.currentThread().getContextClassLoader();
216+
} catch (Throwable e) {
217+
LOG.info("Cannot get context classloader for list state's compaction filter.", e);
218+
}
219+
threadLocalSerializer =
220+
new ThreadLocalSerializerProvider<>(serializer, contextClassLoader);
212221
}
213222

214223
@Override
215224
public FlinkCompactionFilter.ListElementFilter createListElementFilter() {
216-
return new ListElementFilter<>(serializer);
225+
// This method will be invoked by native code multiple times when creating compaction
226+
// filter. And the created filter will be shared by multiple background threads.
227+
// Make sure the serializer is thread-local and has classloader set for each thread
228+
// correctly and individually.
229+
return new ListElementFilter<>(threadLocalSerializer);
217230
}
218231
}
219232

@@ -231,21 +244,22 @@ public long currentTimestamp() {
231244
}
232245

233246
private static class ListElementFilter<T> implements FlinkCompactionFilter.ListElementFilter {
234-
private final TypeSerializer<T> serializer;
235-
private DataInputDeserializer input;
247+
private final ThreadLocalSerializerProvider<T> threadLocalSerializer;
248+
private final DataInputDeserializer input;
236249

237-
private ListElementFilter(TypeSerializer<T> serializer) {
238-
this.serializer = serializer;
250+
private ListElementFilter(ThreadLocalSerializerProvider<T> serializer) {
251+
this.threadLocalSerializer = serializer;
239252
this.input = new DataInputDeserializer();
240253
}
241254

242255
@Override
243256
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
244257
input.setBuffer(bytes);
245258
int lastElementOffset = 0;
259+
TypeSerializer<T> serializer = threadLocalSerializer.get();
246260
while (input.available() > 0) {
247261
try {
248-
long timestamp = nextElementLastAccessTimestamp();
262+
long timestamp = nextElementLastAccessTimestamp(serializer);
249263
if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
250264
break;
251265
}
@@ -258,7 +272,8 @@ public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
258272
return lastElementOffset;
259273
}
260274

261-
private long nextElementLastAccessTimestamp() throws IOException {
275+
private long nextElementLastAccessTimestamp(TypeSerializer<T> serializer)
276+
throws IOException {
262277
TtlValue<?> ttlValue = (TtlValue<?>) serializer.deserialize(input);
263278
if (input.available() > 0) {
264279
input.skipBytesToRead(1);
@@ -267,6 +282,37 @@ private long nextElementLastAccessTimestamp() throws IOException {
267282
}
268283
}
269284

285+
private static class ThreadLocalSerializerProvider<T> implements Supplier<TypeSerializer<T>> {
286+
// Multiple background threads may share the same filter instance, so we need to make sure
287+
// the serializer is thread-local, and every thread has its own instance with classloader.
288+
private final ThreadLocal<TypeSerializer<T>> threadLocalSerializer;
289+
290+
public ThreadLocalSerializerProvider(
291+
TypeSerializer<T> serializer, ClassLoader classLoader) {
292+
this.threadLocalSerializer =
293+
ThreadLocal.withInitial(
294+
() -> {
295+
setClassloaderIfNeeded(classLoader);
296+
return serializer.duplicate();
297+
});
298+
}
299+
300+
private void setClassloaderIfNeeded(ClassLoader classLoader) {
301+
// The classloader that should be set to the current thread when deserializing.
302+
// The reason why we should set classloader is that the serializer may be Kryo
303+
// serializer which needs user classloader to load user classes.
304+
// See FLINK-16686 for more details.
305+
if (classLoader != null) {
306+
Thread.currentThread().setContextClassLoader(classLoader);
307+
}
308+
}
309+
310+
@Override
311+
public TypeSerializer<T> get() {
312+
return threadLocalSerializer.get();
313+
}
314+
}
315+
270316
public void disposeAndClearRegisteredCompactionFactories() {
271317
for (FlinkCompactionFilterFactory factory : compactionFilterFactories.values()) {
272318
IOUtils.closeQuietly(factory);

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.forstdb.ColumnFamilyOptions;
7575
import org.forstdb.ReadOptions;
7676
import org.forstdb.RocksDB;
77+
import org.forstdb.RocksDBException;
7778
import org.forstdb.Snapshot;
7879
import org.forstdb.WriteOptions;
7980
import org.slf4j.Logger;
@@ -944,6 +945,13 @@ public boolean isSafeToReuseKVState() {
944945
return true;
945946
}
946947

948+
@VisibleForTesting
949+
public void compactState(StateDescriptor<?, ?> stateDesc) throws RocksDBException {
950+
ForStOperationUtils.ForStKvStateInfo kvStateInfo =
951+
kvStateInformation.get(stateDesc.getName());
952+
db.compactRange(kvStateInfo.columnFamilyHandle);
953+
}
954+
947955
@Nonnegative
948956
long getWriteBatchSize() {
949957
return writeBatchSize;

0 commit comments

Comments
 (0)