Skip to content

Conversation

Zakelly
Copy link
Contributor

@Zakelly Zakelly commented Sep 22, 2025

What is the purpose of the change

The RocksDB/ForSt 's list state use the user serializer to deserialize the list elements and filter the expired ones during compation. But the compaction runs in background threads that the RocksDB/ForSt allocated. These native threads do not have the JVM context, which is problematic when invoking some serializers that require JVM capabilities, such as Kryo that uses the classloader. Multiple issues have been reported: https://stackoverflow.com/questions/67321286/flink-application-fails-when-rocksdb-list-state-is-cleaned-by-ttl

This PR captured the user classloader from the thread that uses to create the compaction filter factory, and pass that to the native threads, solving the problem above.

Brief change log

  • Change the ListElementFilterFactory taking the current thread's classloader and pass it to the created ListElementFilter later.
  • Add thread-local initializer to set classloader and duplicate serializer in ListElementFilter.
  • Add same logic above for ForSt as well as RocksDB
  • Add tests.

Verifying this change

This change is already covered by existing tests, such as under (Full|Inc)SnapshotRocksDbTtlStateTest and ForStSyncTtlStateTest under TtlListStateWithKryoTestContext.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Zakelly
Copy link
Contributor Author

Zakelly commented Sep 24, 2025

@AlexYinHan @rkhachatryan mind take a look? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants