-
Notifications
You must be signed in to change notification settings - Fork 454
[kv] Support kv snapshot lease #2179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
05bc3d6 to
6611274
Compare
a2b00c8 to
7d9f6c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for KV snapshot consumers to prevent premature deletion of snapshots that are still being consumed. The implementation introduces a consumer registration mechanism where clients can register their interest in specific snapshots, preventing them from being deleted until consumption is complete or the consumer expires.
Key Changes:
- Introduced
KvSnapshotConsumerdata structure to track snapshot consumption per consumer - Implemented
KvSnapshotConsumerManagerin the coordinator to manage consumer lifecycle and expiration - Modified snapshot retention logic to be consumer-aware rather than based on a fixed retention count
- Added Flink source integration to automatically register/unregister consumers during checkpoint lifecycle
Reviewed changes
Copilot reviewed 65 out of 65 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
KvSnapshotConsumer.java |
New data structure tracking consumed snapshots per table/partition/bucket |
KvSnapshotConsumerManager.java |
Manager for consumer registration, unregistration, and expiration checking |
CompletedSnapshotStore.java |
Modified to use consumer-aware retention instead of fixed count |
FlinkSourceEnumerator.java |
Registers consumers when initializing splits, unregisters on checkpoint complete |
FlinkSourceReader.java |
Reports finished snapshot consumption via source events |
ConfigOptions.java |
Added configuration for consumer expiration checking and default expiration time |
ZooKeeperClient.java |
Added methods for persisting consumer state in ZooKeeper |
| Various test files | Removed fixed snapshot retention configuration, added consumer-based tests |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/metadata/KvSnapshotLeaseForBucket.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotLease.java
Show resolved
Hide resolved
...common/src/main/java/org/apache/fluss/flink/source/event/FinishedKvSnapshotConsumeEvent.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java
Outdated
Show resolved
Hide resolved
799ab05 to
2a9a846
Compare
b53fa8a to
c600bf0
Compare
c600bf0 to
922040f
Compare
922040f to
1730d60
Compare
Purpose
Linked issue: close #2171
Support kv snapshot lease
Brief change log
Tests
API and Format
Documentation