From 3ab4f8394578319bdab92ed46f9085541f015618 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 6 Jan 2026 22:49:48 +0800 Subject: [PATCH 01/10] transaction: Add resolve_locks method Signed-off-by: Ping Yu --- src/transaction/client.rs | 33 ++++++++++++++++++++++++++++++ tests/failpoint_tests.rs | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 22873b0c..97deaeb3 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -9,6 +9,7 @@ use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; +use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; use crate::request::plan::CleanupLocksResult; use crate::request::EncodeKeyspace; @@ -19,6 +20,7 @@ use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; use crate::transaction::lowering::new_scan_lock_request; use crate::transaction::lowering::new_unsafe_destroy_range_request; +use crate::transaction::resolve_locks; use crate::transaction::ResolveLocksContext; use crate::transaction::Snapshot; use crate::transaction::Transaction; @@ -315,6 +317,37 @@ impl Client { Ok(plan.execute().await?.truncate_keyspace(self.keyspace)) } + /// Resolves the given locks and returns any that remain live. + /// + /// This method retries until either all locks are resolved or the provided + /// `backoff` is exhausted. The `timestamp` is used as the caller start + /// timestamp when checking transaction status. + pub async fn resolve_locks( + &self, + locks: Vec, + timestamp: Timestamp, + mut backoff: Backoff, + ) -> Result> { + let mut live_locks = locks; + loop { + live_locks = resolve_locks(live_locks, timestamp.clone(), self.pd.clone()).await?; + if live_locks.is_empty() { + return Ok(live_locks); + } + + if backoff.is_none() { + return Ok(live_locks); + } + + match backoff.next_delay_duration() { + None => return Ok(live_locks), + Some(delay_duration) => { + tokio::time::sleep(delay_duration).await; + } + } + } + } + /// Cleans up all keys in a range and quickly reclaim disk space. /// /// The range can span over multiple regions. diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index 20379bcc..bec38ead 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -282,6 +282,48 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn txn_resolve_locks() -> Result<()> { + init().await?; + let scenario = FailScenario::setup(); + + fail::cfg("after-prewrite", "return").unwrap(); + defer! {{ + fail::cfg("after-prewrite", "off").unwrap(); + }} + + let client = TransactionClient::new(pd_addrs()).await?; + let key = b"resolve-locks-key".to_vec(); + let keys = HashSet::from_iter(vec![key.clone()]); + let mut txn = client + .begin_with_options( + TransactionOptions::new_optimistic() + .heartbeat_option(HeartbeatOption::NoHeartbeat) + .drop_check(CheckLevel::Warn), + ) + .await?; + txn.put(key.clone(), b"value".to_vec()).await?; + assert!(txn.commit().await.is_err()); + + let safepoint = client.current_timestamp().await?; + let locks = client.scan_locks(&safepoint, vec![].., 1024).await?; + assert!(locks.iter().any(|lock| lock.key == key)); + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + let start_version = client.current_timestamp().await?; + let live_locks = client + .resolve_locks(locks, start_version, OPTIMISTIC_BACKOFF) + .await?; + assert!(live_locks.is_empty()); + assert_eq!(count_locks(&client).await?, 0); + must_rollbacked(&client, keys).await; + + scenario.teardown(); + Ok(()) +} + #[tokio::test] #[serial] async fn txn_cleanup_2pc_locks() -> Result<()> { From 4189eba5580e5d6c94add3db9cb85117a8d6175b Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 13 Jan 2026 12:16:53 +0800 Subject: [PATCH 02/10] public scan_locks Signed-off-by: Ping Yu --- src/transaction/client.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 97deaeb3..81489a01 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -297,9 +297,7 @@ impl Client { plan.execute().await } - // For test. // Note: `batch_size` must be >= expected number of locks. - #[cfg(feature = "integration-tests")] pub async fn scan_locks( &self, safepoint: &Timestamp, From 507863181de758b3fad92076442e63b926c45f3b Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 20 Jan 2026 22:27:56 +0800 Subject: [PATCH 03/10] handle region error Signed-off-by: Ping Yu --- src/request/plan.rs | 3 +- src/transaction/client.rs | 4 +- src/transaction/lock.rs | 91 +++++++++++++++++++++++++++++---------- 3 files changed, 72 insertions(+), 26 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index b87a2f47..699dc5bd 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -88,7 +88,7 @@ impl StoreRequest for Dispatch { const MULTI_REGION_CONCURRENCY: usize = 16; const MULTI_STORES_CONCURRENCY: usize = 16; -fn is_grpc_error(e: &Error) -> bool { +pub(crate) fn is_grpc_error(e: &Error) -> bool { matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)) } @@ -295,7 +295,6 @@ pub(crate) async fn handle_region_error( e: errorpb::Error, region_store: RegionStore, ) -> Result { - debug!("handle_region_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); let store_id = region_store.region_with_leader.get_store_id(); if let Some(not_leader) = e.not_leader { diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 81489a01..9f9147b9 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -328,7 +328,9 @@ impl Client { ) -> Result> { let mut live_locks = locks; loop { - live_locks = resolve_locks(live_locks, timestamp.clone(), self.pd.clone()).await?; + live_locks = + resolve_locks(live_locks, timestamp.clone(), self.pd.clone(), self.keyspace) + .await?; if live_locks.is_empty() { return Ok(live_locks); } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index f899e8c9..3defab83 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -20,6 +20,8 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::pdpb::Timestamp; use crate::region::RegionVerId; +use crate::request::plan::handle_region_error; +use crate::request::plan::is_grpc_error; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Keyspace; @@ -35,13 +37,10 @@ use crate::transaction::requests::TransactionStatusKind; use crate::Error; use crate::Result; -const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; - fn format_key_for_log(key: &[u8]) -> String { let prefix_len = key.len().min(16); format!("len={}, prefix={:?}", key.len(), &key[..prefix_len]) } - /// _Resolves_ the given locks. Returns locks still live. When there is no live locks, all the given locks are resolved. /// /// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock, @@ -130,6 +129,7 @@ pub async fn resolve_locks( lock.is_txn_file, pd_client.clone(), keyspace, + OPTIMISTIC_BACKOFF, ) .await?; clean_regions @@ -148,21 +148,35 @@ async fn resolve_lock_with_retry( is_txn_file: bool, pd_client: Arc, keyspace: Keyspace, + mut backoff: Backoff, ) -> Result { debug!("resolving locks with retry"); - // FIXME: Add backoff - let mut error = None; - for i in 0..RESOLVE_LOCK_RETRY_LIMIT { - debug!("resolving locks: attempt {}", (i + 1)); + let mut attempt = 0; + loop { + attempt += 1; + debug!("resolving locks: attempt {}", attempt); let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version, is_txn_file); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) - .single_region_with_store(store) - .await? - .extract_error() - .plan(); + let plan_builder = match crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) + .single_region_with_store(store.clone()) + .await + { + Ok(plan_builder) => plan_builder, + Err(Error::LeaderNotFound { region }) => { + pd_client.invalidate_region_cache(region.clone()).await; + match backoff.next_delay_duration() { + Some(duration) => { + sleep(duration).await; + continue; + } + None => return Err(Error::LeaderNotFound { region }), + } + } + Err(err) => return Err(err), + }; + let plan = plan_builder.extract_error().plan(); match plan.execute().await { Ok(_) => { return Ok(ver_id); @@ -171,10 +185,17 @@ async fn resolve_lock_with_retry( Err(Error::ExtractedErrors(mut errors)) => { // ResolveLockResponse can have at most 1 error match errors.pop() { - e @ Some(Error::RegionError(_)) => { - error = e; - continue; - } + Some(Error::RegionError(e)) => match backoff.next_delay_duration() { + Some(duration) => { + let region_error_resolved = + handle_region_error(pd_client.clone(), *e, store.clone()).await?; + if !region_error_resolved { + sleep(duration).await; + } + continue; + } + None => return Err(Error::RegionError(e)), + }, Some(Error::KeyError(key_err)) => { // Keyspace is not truncated here because we need full key info for logging. error!( @@ -191,10 +212,19 @@ async fn resolve_lock_with_retry( None => unreachable!(), } } + Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() { + Some(duration) => { + if let Ok(store_id) = store.region_with_leader.get_store_id() { + pd_client.invalidate_store_cache(store_id).await; + } + sleep(duration).await; + continue; + } + None => return Err(e), + }, Err(e) => return Err(e), } } - Err(error.expect("no error is impossible")) } #[derive(Default, Clone)] @@ -577,6 +607,7 @@ mod tests { use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; + use fail::FailScenario; use serial_test::serial; use super::*; @@ -590,8 +621,17 @@ mod tests { #[tokio::test] #[serial] async fn test_resolve_lock_with_retry(#[case] keyspace: Keyspace) { + let _scenario = FailScenario::setup(); + + const MAX_REGION_ERROR_RETRIES: u32 = 10; + let backoff = Backoff::no_jitter_backoff(0, 0, MAX_REGION_ERROR_RETRIES); + // Test resolve lock within retry limit - fail::cfg("region-error", "9*return").unwrap(); + fail::cfg( + "region-error", + &format!("{}*return", MAX_REGION_ERROR_RETRIES), + ) + .unwrap(); let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { @@ -608,15 +648,20 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = resolve_lock_with_retry(&key, 1, 2, false, client.clone(), keyspace) - .await - .unwrap(); + let resolved_region = + resolve_lock_with_retry(&key, 1, 2, false, client.clone(), keyspace, backoff.clone()) + .await + .unwrap(); assert_eq!(region1.ver_id(), resolved_region); // Test resolve lock over retry limit - fail::cfg("region-error", "10*return").unwrap(); + fail::cfg( + "region-error", + &format!("{}*return", MAX_REGION_ERROR_RETRIES + 1), + ) + .unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, false, client, keyspace) + resolve_lock_with_retry(&key, 3, 4, false, client, keyspace, backoff) .await .expect_err("should return error"); } From 3dc2c5dedab69c56f3ff635c2a45912867329955 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 20 Jan 2026 22:33:22 +0800 Subject: [PATCH 04/10] cargo fmt Signed-off-by: Ping Yu --- src/transaction/client.rs | 10 +++++++--- src/transaction/lock.rs | 31 ++++++++++++++++--------------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 9f9147b9..f0c21fdc 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -328,9 +328,13 @@ impl Client { ) -> Result> { let mut live_locks = locks; loop { - live_locks = - resolve_locks(live_locks, timestamp.clone(), self.pd.clone(), self.keyspace) - .await?; + live_locks = resolve_locks( + live_locks, + timestamp.clone(), + self.pd.clone(), + self.keyspace, + ) + .await?; if live_locks.is_empty() { return Ok(live_locks); } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 3defab83..51ca660a 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -159,23 +159,24 @@ async fn resolve_lock_with_retry( let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version, is_txn_file); - let plan_builder = match crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) - .single_region_with_store(store.clone()) - .await - { - Ok(plan_builder) => plan_builder, - Err(Error::LeaderNotFound { region }) => { - pd_client.invalidate_region_cache(region.clone()).await; - match backoff.next_delay_duration() { - Some(duration) => { - sleep(duration).await; - continue; + let plan_builder = + match crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) + .single_region_with_store(store.clone()) + .await + { + Ok(plan_builder) => plan_builder, + Err(Error::LeaderNotFound { region }) => { + pd_client.invalidate_region_cache(region.clone()).await; + match backoff.next_delay_duration() { + Some(duration) => { + sleep(duration).await; + continue; + } + None => return Err(Error::LeaderNotFound { region }), } - None => return Err(Error::LeaderNotFound { region }), } - } - Err(err) => return Err(err), - }; + Err(err) => return Err(err), + }; let plan = plan_builder.extract_error().plan(); match plan.execute().await { Ok(_) => { From 26d06d1d11e114054a0d516fd3734a10141ac6bd Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 20 Jan 2026 22:52:31 +0800 Subject: [PATCH 05/10] transaction: make resolve_locks work with scan_locks Signed-off-by: Ping Yu --- src/transaction/client.rs | 35 +++++++++++++++++++++++++++++++++-- tests/failpoint_tests.rs | 6 ++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index f0c21fdc..e9ac97a4 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -27,6 +27,7 @@ use crate::transaction::Transaction; use crate::transaction::TransactionOptions; use crate::Backoff; use crate::BoundRange; +use crate::Key; use crate::Result; // FIXME: cargo-culted value @@ -328,13 +329,43 @@ impl Client { ) -> Result> { let mut live_locks = locks; loop { - live_locks = resolve_locks( - live_locks, + let mut encoded_locks = live_locks; + if matches!(self.keyspace, Keyspace::Enable { .. }) { + encoded_locks = encoded_locks + .into_iter() + .map(|mut lock| { + lock.key = Key::from(lock.key) + .encode_keyspace(self.keyspace, KeyMode::Txn) + .into(); + lock.primary_lock = Key::from(lock.primary_lock) + .encode_keyspace(self.keyspace, KeyMode::Txn) + .into(); + for secondary in &mut lock.secondaries { + take_mut::take(secondary, |secondary| { + Key::from(secondary) + .encode_keyspace(self.keyspace, KeyMode::Txn) + .into() + }); + } + lock + }) + .collect(); + } + + let mut resolved_locks = resolve_locks( + encoded_locks, timestamp.clone(), self.pd.clone(), self.keyspace, ) .await?; + + if matches!(self.keyspace, Keyspace::Enable { .. }) { + use crate::request::TruncateKeyspace; + resolved_locks = resolved_locks.truncate_keyspace(self.keyspace); + } + + live_locks = resolved_locks; if live_locks.is_empty() { return Ok(live_locks); } diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index bec38ead..99094454 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -293,7 +293,9 @@ async fn txn_resolve_locks() -> Result<()> { fail::cfg("after-prewrite", "off").unwrap(); }} - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let key = b"resolve-locks-key".to_vec(); let keys = HashSet::from_iter(vec![key.clone()]); let mut txn = client @@ -379,7 +381,7 @@ async fn txn_cleanup_2pc_locks() -> Result<()> { ) .await?; let keys = write_data(&client, false, false).await?; - assert_eq!(count_locks(&client).await?, 0); + assert_eq!(wait_for_locks_count(&client, 0).await?, 0); let safepoint = client.current_timestamp().await?; let options = ResolveLocksOptions { From 560fc6a1626c859a62f129396397777013f61ae5 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 20 Jan 2026 23:20:21 +0800 Subject: [PATCH 06/10] keyspace: encode lock infos and simplify resolve_locks Signed-off-by: Ping Yu --- src/request/keyspace.rs | 63 +++++++++++++++++++++++++++++++++++++++ src/transaction/client.rs | 38 ++++------------------- 2 files changed, 68 insertions(+), 33 deletions(-) diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs index 94fd5f76..7815313c 100644 --- a/src/request/keyspace.rs +++ b/src/request/keyspace.rs @@ -183,6 +183,32 @@ impl TruncateKeyspace for Vec { } } +impl EncodeKeyspace for Vec { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } + for lock in &mut self { + take_mut::take(&mut lock.key, |key| { + Key::from(key).encode_keyspace(keyspace, key_mode).into() + }); + take_mut::take(&mut lock.primary_lock, |primary| { + Key::from(primary) + .encode_keyspace(keyspace, key_mode) + .into() + }); + for secondary in lock.secondaries.iter_mut() { + take_mut::take(secondary, |secondary| { + Key::from(secondary) + .encode_keyspace(keyspace, key_mode) + .into() + }); + } + } + self + } +} + fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] { let mut prefix = keyspace_id.to_be_bytes(); prefix[0] = match key_mode { @@ -276,6 +302,43 @@ mod tests { ); } + #[test] + fn test_encode_keyspace_lock_infos() { + let keyspace = Keyspace::Enable { keyspace_id: 1 }; + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'k', b'1'], + primary_lock: vec![b'p', b'1'], + secondaries: vec![vec![b's', b'1'], vec![b's', b'2']], + ..Default::default() + }; + + let locks = vec![lock].encode_keyspace(keyspace, KeyMode::Txn); + assert_eq!(locks.len(), 1); + assert_eq!(locks[0].key, vec![b'x', 0, 0, 1, b'k', b'1']); + assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0, 1, b'p', b'1']); + assert_eq!( + locks[0].secondaries, + vec![ + vec![b'x', 0, 0, 1, b's', b'1'], + vec![b'x', 0, 0, 1, b's', b'2'] + ] + ); + } + + #[test] + fn test_encode_keyspace_lock_infos_noop_when_disabled() { + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'k', b'1'], + primary_lock: vec![b'p', b'1'], + secondaries: vec![vec![b's', b'1']], + ..Default::default() + }; + + let locks = vec![lock.clone()].encode_keyspace(Keyspace::Disable, KeyMode::Txn); + assert_eq!(locks, vec![lock]); + } + #[test] fn test_truncate_version() { let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); diff --git a/src/transaction/client.rs b/src/transaction/client.rs index e9ac97a4..b89796b8 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -27,7 +27,6 @@ use crate::transaction::Transaction; use crate::transaction::TransactionOptions; use crate::Backoff; use crate::BoundRange; -use crate::Key; use crate::Result; // FIXME: cargo-culted value @@ -327,45 +326,18 @@ impl Client { timestamp: Timestamp, mut backoff: Backoff, ) -> Result> { + use crate::request::TruncateKeyspace; + let mut live_locks = locks; loop { - let mut encoded_locks = live_locks; - if matches!(self.keyspace, Keyspace::Enable { .. }) { - encoded_locks = encoded_locks - .into_iter() - .map(|mut lock| { - lock.key = Key::from(lock.key) - .encode_keyspace(self.keyspace, KeyMode::Txn) - .into(); - lock.primary_lock = Key::from(lock.primary_lock) - .encode_keyspace(self.keyspace, KeyMode::Txn) - .into(); - for secondary in &mut lock.secondaries { - take_mut::take(secondary, |secondary| { - Key::from(secondary) - .encode_keyspace(self.keyspace, KeyMode::Txn) - .into() - }); - } - lock - }) - .collect(); - } - - let mut resolved_locks = resolve_locks( - encoded_locks, + let resolved_locks = resolve_locks( + live_locks.encode_keyspace(self.keyspace, KeyMode::Txn), timestamp.clone(), self.pd.clone(), self.keyspace, ) .await?; - - if matches!(self.keyspace, Keyspace::Enable { .. }) { - use crate::request::TruncateKeyspace; - resolved_locks = resolved_locks.truncate_keyspace(self.keyspace); - } - - live_locks = resolved_locks; + live_locks = resolved_locks.truncate_keyspace(self.keyspace); if live_locks.is_empty() { return Ok(live_locks); } From 067dfa26c0f242a60c91ccedbe62d298bdcb7562 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 20 Jan 2026 23:43:50 +0800 Subject: [PATCH 07/10] keyspace: Remove the "apiv2-no-prefix" feature (#522) Signed-off-by: Ping Yu --- Cargo.toml | 1 - Makefile | 4 ++-- README.md | 2 +- src/request/keyspace.rs | 6 +----- src/transaction/client.rs | 1 - 5 files changed, 4 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d6cd57d..c66ba1fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ prometheus = ["prometheus/push", "prometheus/process"] # Enable integration tests with a running TiKV and PD instance. # Use $PD_ADDRS, comma separated, to set the addresses the tests use. integration-tests = [] -apiv2-no-prefix = [] [lib] name = "tikv_client" diff --git a/Makefile b/Makefile index 6e22ead3..ef3fd397 100644 --- a/Makefile +++ b/Makefile @@ -5,11 +5,11 @@ export RUSTFLAGS=-Dwarnings export PD_ADDRS ?= 127.0.0.1:2379 export MULTI_REGION ?= 1 -ALL_FEATURES := integration-tests apiv2-no-prefix +ALL_FEATURES := integration-tests NEXTEST_ARGS := --config-file $(shell pwd)/config/nextest.toml -INTEGRATION_TEST_ARGS := --features "integration-tests apiv2-no-prefix" --test-threads 1 +INTEGRATION_TEST_ARGS := --features "integration-tests" --test-threads 1 RUN_INTEGRATION_TEST := cargo nextest run ${NEXTEST_ARGS} --all ${INTEGRATION_TEST_ARGS} diff --git a/README.md b/README.md index 3841cf63..7e5e6d9a 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ The TiKV client is a Rust library (crate). To use this crate in your project, ad ```toml [dependencies] -tikv-client = "0.3" +tikv-client = "0.4" ``` ### Prerequisites diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs index 7815313c..1524501a 100644 --- a/src/request/keyspace.rs +++ b/src/request/keyspace.rs @@ -13,6 +13,7 @@ pub const TXN_KEY_PREFIX: u8 = b'x'; pub const KEYSPACE_PREFIX_LEN: usize = 4; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[non_exhaustive] pub enum Keyspace { Disable, Enable { @@ -23,7 +24,6 @@ pub enum Keyspace { /// This mode is intended for **server-side embedding** use cases (e.g. embedding this client in /// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed /// through unchanged. - #[cfg(feature = "apiv2-no-prefix")] ApiV2NoPrefix, } @@ -38,7 +38,6 @@ impl Keyspace { match self { Keyspace::Disable => kvrpcpb::ApiVersion::V1, Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2, - #[cfg(feature = "apiv2-no-prefix")] Keyspace::ApiV2NoPrefix => kvrpcpb::ApiVersion::V2, } } @@ -356,7 +355,6 @@ mod tests { assert_eq!(key.truncate_keyspace(keyspace), expected_key); } - #[cfg(feature = "apiv2-no-prefix")] #[test] fn test_apiv2_no_prefix_api_version() { assert_eq!( @@ -365,7 +363,6 @@ mod tests { ); } - #[cfg(feature = "apiv2-no-prefix")] #[test] fn test_apiv2_no_prefix_encode_is_noop() { let keyspace = Keyspace::ApiV2NoPrefix; @@ -388,7 +385,6 @@ mod tests { ); } - #[cfg(feature = "apiv2-no-prefix")] #[test] fn test_apiv2_no_prefix_truncate_is_noop() { let keyspace = Keyspace::ApiV2NoPrefix; diff --git a/src/transaction/client.rs b/src/transaction/client.rs index b89796b8..8d506ef1 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -127,7 +127,6 @@ impl Client { /// keyspace/key-mode prefix, with a custom configuration. /// /// This is intended for **server-side embedding** use cases. `config.keyspace` must be unset. - #[cfg(feature = "apiv2-no-prefix")] pub async fn new_with_config_api_v2_no_prefix>( pd_endpoints: Vec, config: Config, From 7ef94f002fc41ed3093ff8f2bbe29489e9f8f19f Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 21 Jan 2026 10:36:16 +0800 Subject: [PATCH 08/10] tests: consolidate keyspace lock encode/truncate coverage Signed-off-by: Ping Yu --- src/request/keyspace.rs | 120 ++++++++++++++++++++++++++++++---------- 1 file changed, 92 insertions(+), 28 deletions(-) diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs index 1524501a..79f7225d 100644 --- a/src/request/keyspace.rs +++ b/src/request/keyspace.rs @@ -299,60 +299,103 @@ mod tests { mutation.encode_keyspace(keyspace, key_mode), expected_mutation ); - } - - #[test] - fn test_encode_keyspace_lock_infos() { - let keyspace = Keyspace::Enable { keyspace_id: 1 }; + let key_mode = KeyMode::Txn; let lock = crate::proto::kvrpcpb::LockInfo { key: vec![b'k', b'1'], primary_lock: vec![b'p', b'1'], secondaries: vec![vec![b's', b'1'], vec![b's', b'2']], ..Default::default() }; - - let locks = vec![lock].encode_keyspace(keyspace, KeyMode::Txn); + let locks = vec![lock].encode_keyspace(keyspace, key_mode); assert_eq!(locks.len(), 1); - assert_eq!(locks[0].key, vec![b'x', 0, 0, 1, b'k', b'1']); - assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0, 1, b'p', b'1']); + assert_eq!(locks[0].key, vec![b'x', 0, 0xDE, 0xAD, b'k', b'1']); + assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0xDE, 0xAD, b'p', b'1']); assert_eq!( locks[0].secondaries, vec![ - vec![b'x', 0, 0, 1, b's', b'1'], - vec![b'x', 0, 0, 1, b's', b'2'] + vec![b'x', 0, 0xDE, 0xAD, b's', b'1'], + vec![b'x', 0, 0xDE, 0xAD, b's', b'2'] ] ); } - #[test] - fn test_encode_keyspace_lock_infos_noop_when_disabled() { - let lock = crate::proto::kvrpcpb::LockInfo { - key: vec![b'k', b'1'], - primary_lock: vec![b'p', b'1'], - secondaries: vec![vec![b's', b'1']], - ..Default::default() - }; - - let locks = vec![lock.clone()].encode_keyspace(Keyspace::Disable, KeyMode::Txn); - assert_eq!(locks, vec![lock]); - } - #[test] fn test_truncate_version() { - let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); let keyspace = Keyspace::Enable { keyspace_id: 0xDEAD, }; + + let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); let expected_key = Key::from(vec![0xBE, 0xEF]); assert_eq!(key.truncate_keyspace(keyspace), expected_key); let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]); - let keyspace = Keyspace::Enable { - keyspace_id: 0xDEAD, - }; let expected_key = Key::from(vec![0xBE, 0xEF]); assert_eq!(key.truncate_keyspace(keyspace), expected_key); + + let pair = KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']); + let expected_pair = KvPair(Key::from(vec![b'k']), vec![b'v']); + assert_eq!(pair.truncate_keyspace(keyspace), expected_pair); + + let range = Range { + start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']), + end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']), + }; + let expected_range = Range { + start: Key::from(vec![b'a']), + end: Key::from(vec![b'b']), + }; + assert_eq!(range.truncate_keyspace(keyspace), expected_range); + + let ranges = vec![ + Range { + start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']), + end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']), + }, + Range { + start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'c']), + end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'd']), + }, + ]; + let expected_ranges = vec![ + Range { + start: Key::from(vec![b'a']), + end: Key::from(vec![b'b']), + }, + Range { + start: Key::from(vec![b'c']), + end: Key::from(vec![b'd']), + }, + ]; + assert_eq!(ranges.truncate_keyspace(keyspace), expected_ranges); + + let pairs = vec![ + KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']), + KvPair( + Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k', b'2']), + vec![b'v', b'2'], + ), + ]; + let expected_pairs = vec![ + KvPair(Key::from(vec![b'k']), vec![b'v']), + KvPair(Key::from(vec![b'k', b'2']), vec![b'v', b'2']), + ]; + assert_eq!(pairs.truncate_keyspace(keyspace), expected_pairs); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'x', 0, 0xDE, 0xAD, b'k'], + primary_lock: vec![b'x', 0, 0xDE, 0xAD, b'p'], + secondaries: vec![vec![b'x', 0, 0xDE, 0xAD, b's']], + ..Default::default() + }; + let expected_lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'k'], + primary_lock: vec![b'p'], + secondaries: vec![vec![b's']], + ..Default::default() + }; + assert_eq!(vec![lock].truncate_keyspace(keyspace), vec![expected_lock]); } #[test] @@ -383,6 +426,27 @@ mod tests { mutation.clone().encode_keyspace(keyspace, key_mode), mutation ); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'x', 0, 0, 0, b'k'], + primary_lock: vec![b'x', 0, 0, 0, b'p'], + secondaries: vec![vec![b'x', 0, 0, 0, b's']], + ..Default::default() + }; + let locks = vec![lock]; + assert_eq!(locks.clone().encode_keyspace(keyspace, key_mode), locks); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'k', b'1'], + primary_lock: vec![b'p', b'1'], + secondaries: vec![vec![b's', b'1']], + ..Default::default() + }; + let locks = vec![lock.clone()]; + assert_eq!( + locks.clone().encode_keyspace(Keyspace::Disable, key_mode), + locks + ); } #[test] From d3ca38b877c77a84bc529557b119ea92ec08f435 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 21 Jan 2026 11:08:12 +0800 Subject: [PATCH 09/10] polish Signed-off-by: Ping Yu --- src/request/plan.rs | 1 + src/transaction/client.rs | 4 ---- src/transaction/lock.rs | 1 + 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 699dc5bd..8bd15bb5 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -295,6 +295,7 @@ pub(crate) async fn handle_region_error( e: errorpb::Error, region_store: RegionStore, ) -> Result { + debug!("handle_region_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); let store_id = region_store.region_with_leader.get_store_id(); if let Some(not_leader) = e.not_leader { diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 8d506ef1..f042b391 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -341,10 +341,6 @@ impl Client { return Ok(live_locks); } - if backoff.is_none() { - return Ok(live_locks); - } - match backoff.next_delay_duration() { None => return Ok(live_locks), Some(delay_duration) => { diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 51ca660a..65ab494a 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -41,6 +41,7 @@ fn format_key_for_log(key: &[u8]) -> String { let prefix_len = key.len().min(16); format!("len={}, prefix={:?}", key.len(), &key[..prefix_len]) } + /// _Resolves_ the given locks. Returns locks still live. When there is no live locks, all the given locks are resolved. /// /// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock, From 9b9e80ae982283290433197bc7a2f34c629f95cd Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 21 Jan 2026 11:38:15 +0800 Subject: [PATCH 10/10] invalidate region cache Signed-off-by: Ping Yu --- src/transaction/lock.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 65ab494a..5c5d2513 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -216,6 +216,7 @@ async fn resolve_lock_with_retry( } Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() { Some(duration) => { + pd_client.invalidate_region_cache(ver_id.clone()).await; if let Ok(store_id) = store.region_with_leader.get_store_id() { pd_client.invalidate_store_cache(store_id).await; }