diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs index 35ad4e64..79f7225d 100644 --- a/src/request/keyspace.rs +++ b/src/request/keyspace.rs @@ -182,6 +182,32 @@ impl TruncateKeyspace for Vec { } } +impl EncodeKeyspace for Vec { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } + for lock in &mut self { + take_mut::take(&mut lock.key, |key| { + Key::from(key).encode_keyspace(keyspace, key_mode).into() + }); + take_mut::take(&mut lock.primary_lock, |primary| { + Key::from(primary) + .encode_keyspace(keyspace, key_mode) + .into() + }); + for secondary in lock.secondaries.iter_mut() { + take_mut::take(secondary, |secondary| { + Key::from(secondary) + .encode_keyspace(keyspace, key_mode) + .into() + }); + } + } + self + } +} + fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] { let mut prefix = keyspace_id.to_be_bytes(); prefix[0] = match key_mode { @@ -273,23 +299,103 @@ mod tests { mutation.encode_keyspace(keyspace, key_mode), expected_mutation ); + + let key_mode = KeyMode::Txn; + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'k', b'1'], + primary_lock: vec![b'p', b'1'], + secondaries: vec![vec![b's', b'1'], vec![b's', b'2']], + ..Default::default() + }; + let locks = vec![lock].encode_keyspace(keyspace, key_mode); + assert_eq!(locks.len(), 1); + assert_eq!(locks[0].key, vec![b'x', 0, 0xDE, 0xAD, b'k', b'1']); + assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0xDE, 0xAD, b'p', b'1']); + assert_eq!( + locks[0].secondaries, + vec![ + vec![b'x', 0, 0xDE, 0xAD, b's', b'1'], + vec![b'x', 0, 0xDE, 0xAD, b's', b'2'] + ] + ); } #[test] fn test_truncate_version() { - let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); let keyspace = Keyspace::Enable { keyspace_id: 0xDEAD, }; + + let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); let expected_key = Key::from(vec![0xBE, 0xEF]); assert_eq!(key.truncate_keyspace(keyspace), expected_key); let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]); - let keyspace = Keyspace::Enable { - keyspace_id: 0xDEAD, - }; let expected_key = Key::from(vec![0xBE, 0xEF]); assert_eq!(key.truncate_keyspace(keyspace), expected_key); + + let pair = KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']); + let expected_pair = KvPair(Key::from(vec![b'k']), vec![b'v']); + assert_eq!(pair.truncate_keyspace(keyspace), expected_pair); + + let range = Range { + start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']), + end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']), + }; + let expected_range = Range { + start: Key::from(vec![b'a']), + end: Key::from(vec![b'b']), + }; + assert_eq!(range.truncate_keyspace(keyspace), expected_range); + + let ranges = vec![ + Range { + start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']), + end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']), + }, + Range { + start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'c']), + end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'd']), + }, + ]; + let expected_ranges = vec![ + Range { + start: Key::from(vec![b'a']), + end: Key::from(vec![b'b']), + }, + Range { + start: Key::from(vec![b'c']), + end: Key::from(vec![b'd']), + }, + ]; + assert_eq!(ranges.truncate_keyspace(keyspace), expected_ranges); + + let pairs = vec![ + KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']), + KvPair( + Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k', b'2']), + vec![b'v', b'2'], + ), + ]; + let expected_pairs = vec![ + KvPair(Key::from(vec![b'k']), vec![b'v']), + KvPair(Key::from(vec![b'k', b'2']), vec![b'v', b'2']), + ]; + assert_eq!(pairs.truncate_keyspace(keyspace), expected_pairs); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'x', 0, 0xDE, 0xAD, b'k'], + primary_lock: vec![b'x', 0, 0xDE, 0xAD, b'p'], + secondaries: vec![vec![b'x', 0, 0xDE, 0xAD, b's']], + ..Default::default() + }; + let expected_lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'k'], + primary_lock: vec![b'p'], + secondaries: vec![vec![b's']], + ..Default::default() + }; + assert_eq!(vec![lock].truncate_keyspace(keyspace), vec![expected_lock]); } #[test] @@ -320,6 +426,27 @@ mod tests { mutation.clone().encode_keyspace(keyspace, key_mode), mutation ); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'x', 0, 0, 0, b'k'], + primary_lock: vec![b'x', 0, 0, 0, b'p'], + secondaries: vec![vec![b'x', 0, 0, 0, b's']], + ..Default::default() + }; + let locks = vec![lock]; + assert_eq!(locks.clone().encode_keyspace(keyspace, key_mode), locks); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'k', b'1'], + primary_lock: vec![b'p', b'1'], + secondaries: vec![vec![b's', b'1']], + ..Default::default() + }; + let locks = vec![lock.clone()]; + assert_eq!( + locks.clone().encode_keyspace(Keyspace::Disable, key_mode), + locks + ); } #[test] diff --git a/src/request/plan.rs b/src/request/plan.rs index b87a2f47..8bd15bb5 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -88,7 +88,7 @@ impl StoreRequest for Dispatch { const MULTI_REGION_CONCURRENCY: usize = 16; const MULTI_STORES_CONCURRENCY: usize = 16; -fn is_grpc_error(e: &Error) -> bool { +pub(crate) fn is_grpc_error(e: &Error) -> bool { matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)) } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index e28d452a..f042b391 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -9,6 +9,7 @@ use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; +use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; use crate::request::plan::CleanupLocksResult; use crate::request::EncodeKeyspace; @@ -19,6 +20,7 @@ use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; use crate::transaction::lowering::new_scan_lock_request; use crate::transaction::lowering::new_unsafe_destroy_range_request; +use crate::transaction::resolve_locks; use crate::transaction::ResolveLocksContext; use crate::transaction::Snapshot; use crate::transaction::Transaction; @@ -294,9 +296,7 @@ impl Client { plan.execute().await } - // For test. // Note: `batch_size` must be >= expected number of locks. - #[cfg(feature = "integration-tests")] pub async fn scan_locks( &self, safepoint: &Timestamp, @@ -314,6 +314,42 @@ impl Client { Ok(plan.execute().await?.truncate_keyspace(self.keyspace)) } + /// Resolves the given locks and returns any that remain live. + /// + /// This method retries until either all locks are resolved or the provided + /// `backoff` is exhausted. The `timestamp` is used as the caller start + /// timestamp when checking transaction status. + pub async fn resolve_locks( + &self, + locks: Vec, + timestamp: Timestamp, + mut backoff: Backoff, + ) -> Result> { + use crate::request::TruncateKeyspace; + + let mut live_locks = locks; + loop { + let resolved_locks = resolve_locks( + live_locks.encode_keyspace(self.keyspace, KeyMode::Txn), + timestamp.clone(), + self.pd.clone(), + self.keyspace, + ) + .await?; + live_locks = resolved_locks.truncate_keyspace(self.keyspace); + if live_locks.is_empty() { + return Ok(live_locks); + } + + match backoff.next_delay_duration() { + None => return Ok(live_locks), + Some(delay_duration) => { + tokio::time::sleep(delay_duration).await; + } + } + } + } + /// Cleans up all keys in a range and quickly reclaim disk space. /// /// The range can span over multiple regions. diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index f899e8c9..5c5d2513 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -20,6 +20,8 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::pdpb::Timestamp; use crate::region::RegionVerId; +use crate::request::plan::handle_region_error; +use crate::request::plan::is_grpc_error; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Keyspace; @@ -35,8 +37,6 @@ use crate::transaction::requests::TransactionStatusKind; use crate::Error; use crate::Result; -const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; - fn format_key_for_log(key: &[u8]) -> String { let prefix_len = key.len().min(16); format!("len={}, prefix={:?}", key.len(), &key[..prefix_len]) @@ -130,6 +130,7 @@ pub async fn resolve_locks( lock.is_txn_file, pd_client.clone(), keyspace, + OPTIMISTIC_BACKOFF, ) .await?; clean_regions @@ -148,21 +149,36 @@ async fn resolve_lock_with_retry( is_txn_file: bool, pd_client: Arc, keyspace: Keyspace, + mut backoff: Backoff, ) -> Result { debug!("resolving locks with retry"); - // FIXME: Add backoff - let mut error = None; - for i in 0..RESOLVE_LOCK_RETRY_LIMIT { - debug!("resolving locks: attempt {}", (i + 1)); + let mut attempt = 0; + loop { + attempt += 1; + debug!("resolving locks: attempt {}", attempt); let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version, is_txn_file); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) - .single_region_with_store(store) - .await? - .extract_error() - .plan(); + let plan_builder = + match crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) + .single_region_with_store(store.clone()) + .await + { + Ok(plan_builder) => plan_builder, + Err(Error::LeaderNotFound { region }) => { + pd_client.invalidate_region_cache(region.clone()).await; + match backoff.next_delay_duration() { + Some(duration) => { + sleep(duration).await; + continue; + } + None => return Err(Error::LeaderNotFound { region }), + } + } + Err(err) => return Err(err), + }; + let plan = plan_builder.extract_error().plan(); match plan.execute().await { Ok(_) => { return Ok(ver_id); @@ -171,10 +187,17 @@ async fn resolve_lock_with_retry( Err(Error::ExtractedErrors(mut errors)) => { // ResolveLockResponse can have at most 1 error match errors.pop() { - e @ Some(Error::RegionError(_)) => { - error = e; - continue; - } + Some(Error::RegionError(e)) => match backoff.next_delay_duration() { + Some(duration) => { + let region_error_resolved = + handle_region_error(pd_client.clone(), *e, store.clone()).await?; + if !region_error_resolved { + sleep(duration).await; + } + continue; + } + None => return Err(Error::RegionError(e)), + }, Some(Error::KeyError(key_err)) => { // Keyspace is not truncated here because we need full key info for logging. error!( @@ -191,10 +214,20 @@ async fn resolve_lock_with_retry( None => unreachable!(), } } + Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() { + Some(duration) => { + pd_client.invalidate_region_cache(ver_id.clone()).await; + if let Ok(store_id) = store.region_with_leader.get_store_id() { + pd_client.invalidate_store_cache(store_id).await; + } + sleep(duration).await; + continue; + } + None => return Err(e), + }, Err(e) => return Err(e), } } - Err(error.expect("no error is impossible")) } #[derive(Default, Clone)] @@ -577,6 +610,7 @@ mod tests { use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; + use fail::FailScenario; use serial_test::serial; use super::*; @@ -590,8 +624,17 @@ mod tests { #[tokio::test] #[serial] async fn test_resolve_lock_with_retry(#[case] keyspace: Keyspace) { + let _scenario = FailScenario::setup(); + + const MAX_REGION_ERROR_RETRIES: u32 = 10; + let backoff = Backoff::no_jitter_backoff(0, 0, MAX_REGION_ERROR_RETRIES); + // Test resolve lock within retry limit - fail::cfg("region-error", "9*return").unwrap(); + fail::cfg( + "region-error", + &format!("{}*return", MAX_REGION_ERROR_RETRIES), + ) + .unwrap(); let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { @@ -608,15 +651,20 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = resolve_lock_with_retry(&key, 1, 2, false, client.clone(), keyspace) - .await - .unwrap(); + let resolved_region = + resolve_lock_with_retry(&key, 1, 2, false, client.clone(), keyspace, backoff.clone()) + .await + .unwrap(); assert_eq!(region1.ver_id(), resolved_region); // Test resolve lock over retry limit - fail::cfg("region-error", "10*return").unwrap(); + fail::cfg( + "region-error", + &format!("{}*return", MAX_REGION_ERROR_RETRIES + 1), + ) + .unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, false, client, keyspace) + resolve_lock_with_retry(&key, 3, 4, false, client, keyspace, backoff) .await .expect_err("should return error"); } diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index 20379bcc..99094454 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -282,6 +282,50 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn txn_resolve_locks() -> Result<()> { + init().await?; + let scenario = FailScenario::setup(); + + fail::cfg("after-prewrite", "return").unwrap(); + defer! {{ + fail::cfg("after-prewrite", "off").unwrap(); + }} + + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; + let key = b"resolve-locks-key".to_vec(); + let keys = HashSet::from_iter(vec![key.clone()]); + let mut txn = client + .begin_with_options( + TransactionOptions::new_optimistic() + .heartbeat_option(HeartbeatOption::NoHeartbeat) + .drop_check(CheckLevel::Warn), + ) + .await?; + txn.put(key.clone(), b"value".to_vec()).await?; + assert!(txn.commit().await.is_err()); + + let safepoint = client.current_timestamp().await?; + let locks = client.scan_locks(&safepoint, vec![].., 1024).await?; + assert!(locks.iter().any(|lock| lock.key == key)); + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + let start_version = client.current_timestamp().await?; + let live_locks = client + .resolve_locks(locks, start_version, OPTIMISTIC_BACKOFF) + .await?; + assert!(live_locks.is_empty()); + assert_eq!(count_locks(&client).await?, 0); + must_rollbacked(&client, keys).await; + + scenario.teardown(); + Ok(()) +} + #[tokio::test] #[serial] async fn txn_cleanup_2pc_locks() -> Result<()> { @@ -337,7 +381,7 @@ async fn txn_cleanup_2pc_locks() -> Result<()> { ) .await?; let keys = write_data(&client, false, false).await?; - assert_eq!(count_locks(&client).await?, 0); + assert_eq!(wait_for_locks_count(&client, 0).await?, 0); let safepoint = client.current_timestamp().await?; let options = ResolveLocksOptions {