From 54ef4f6a4adcead528389327c9a432d4729730f0 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 28 May 2025 17:21:38 +0800 Subject: [PATCH 1/2] transaction: Handle "commit ts expired" error (#491) Signed-off-by: Ping Yu --- src/common/errors.rs | 2 +- src/kv/mod.rs | 2 +- src/region_cache.rs | 6 ++-- src/store/errors.rs | 10 +++--- src/transaction/transaction.rs | 57 +++++++++++++++++++++++++++++++--- tests/common/ctl.rs | 10 +++--- 6 files changed, 69 insertions(+), 18 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index c5cd9b44..d69feedc 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -104,7 +104,7 @@ pub enum Error { #[error("{}", message)] InternalError { message: String }, #[error("{0}")] - StringError(String), + OtherError(String), #[error("PessimisticLock error: {:?}", inner)] PessimisticLockError { inner: Box, diff --git a/src/kv/mod.rs b/src/kv/mod.rs index bc5ac6e5..84247066 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -14,7 +14,7 @@ pub use key::KvPairTTL; pub use kvpair::KvPair; pub use value::Value; -struct HexRepr<'a>(pub &'a [u8]); +pub struct HexRepr<'a>(pub &'a [u8]); impl fmt::Display for HexRepr<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/src/region_cache.rs b/src/region_cache.rs index 8837de38..b8d7ced7 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -117,7 +117,7 @@ impl RegionCache { return self.read_through_region_by_id(id).await; } } - Err(Error::StringError(format!( + Err(Error::OtherError(format!( "Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times" ))) } @@ -316,7 +316,7 @@ mod test { .filter(|(_, r)| r.contains(&key.clone().into())) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) } async fn get_region_by_id( @@ -331,7 +331,7 @@ mod test { .filter(|(id, _)| id == &®ion_id) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) } async fn get_store( diff --git a/src/store/errors.rs b/src/store/errors.rs index 47b6bdfa..eb9f623a 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -1,7 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use std::fmt::Display; - use crate::proto::kvrpcpb; use crate::Error; @@ -164,11 +162,15 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { } } -impl HasKeyErrors for Result { +impl HasKeyErrors for Result { fn key_errors(&mut self) -> Option> { match self { Ok(x) => x.key_errors(), - Err(e) => Some(vec![Error::StringError(e.to_string())]), + Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)), + Err(e) => Some(vec![std::mem::replace( + e, + Error::OtherError("".to_string()), // placeholder, no use. + )]), } } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index b30ef6e4..e09811dc 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -9,12 +9,12 @@ use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; -use log::warn; -use log::{debug, trace}; +use log::{debug, error, info, trace, warn}; use tokio::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; +use crate::kv::HexRepr; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; @@ -1271,7 +1271,7 @@ impl Committer { let min_commit_ts = self.prewrite().await?; fail_point!("after-prewrite", |_| { - Err(Error::StringError( + Err(Error::OtherError( "failpoint: after-prewrite return error".to_owned(), )) }); @@ -1285,7 +1285,7 @@ impl Committer { // FIXME: min_commit_ts == 0 => fallback to normal 2PC min_commit_ts.unwrap() } else { - match self.commit_primary().await { + match self.commit_primary_with_retry().await { Ok(commit_ts) => commit_ts, Err(e) => { return if self.undetermined { @@ -1388,6 +1388,11 @@ impl Committer { .plan(); plan.execute() .inspect_err(|e| { + debug!( + "commit primary error: {:?}, start_ts: {}", + e, + self.start_version.version() + ); // We don't know whether the transaction is committed or not if we fail to receive // the response. Then, we mark the transaction as undetermined and propagate the // error to the user. @@ -1400,6 +1405,48 @@ impl Committer { Ok(commit_version) } + async fn commit_primary_with_retry(&mut self) -> Result { + loop { + match self.commit_primary().await { + Ok(commit_version) => return Ok(commit_version), + Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { + Some(Error::KeyError(key_err)) => { + if let Some(expired) = key_err.commit_ts_expired { + // Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go + info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}", + self.start_version.version()); + + let primary_key = self.primary_key.as_ref().unwrap(); + if primary_key != expired.key.as_ref() { + error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}", + self.start_version.version(), HexRepr(&expired.key), primary_key); + return Err(Error::OtherError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string())); + } + + // Do not retry for a txn which has a too large min_commit_ts. + // 3600000 << 18 = 943718400000 + if expired + .min_commit_ts + .saturating_sub(expired.attempted_commit_ts) + > 943718400000 + { + let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}", + expired.min_commit_ts, expired.attempted_commit_ts); + return Err(Error::OtherError(msg)); + } + continue; + } else { + return Err(Error::KeyError(key_err)); + } + } + Some(err) => return Err(err), + None => unreachable!(), + }, + Err(err) => return Err(err), + } + } + } + async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> { debug!("committing secondary"); let mutations_len = self.mutations.len(); @@ -1417,7 +1464,7 @@ impl Committer { let percent = percent.unwrap().parse::().unwrap(); new_len = mutations_len * percent / 100; if new_len == 0 { - Err(Error::StringError( + Err(Error::OtherError( "failpoint: before-commit-secondary return error".to_owned(), )) } else { diff --git a/tests/common/ctl.rs b/tests/common/ctl.rs index 92dcacca..32781405 100644 --- a/tests/common/ctl.rs +++ b/tests/common/ctl.rs @@ -10,14 +10,16 @@ use crate::common::Result; pub async fn get_region_count() -> Result { let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0])) .await - .map_err(|e| Error::StringError(e.to_string()))?; + .map_err(|e| Error::OtherError(e.to_string()))?; let body = res .text() .await - .map_err(|e| Error::StringError(e.to_string()))?; - let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap(); + .map_err(|e| Error::OtherError(e.to_string()))?; + let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| { + panic!("invalid body: {:?}, error: {:?}", body, err); + }); value["count"] .as_u64() - .ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned())) + .ok_or_else(|| Error::OtherError("pd region count does not return an integer".to_owned())) } From 0b51a72ce906588987f3f68de0f5adf257856517 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 20 Jan 2026 18:28:18 +0800 Subject: [PATCH 2/2] polish Signed-off-by: Ping Yu --- src/common/errors.rs | 2 +- src/region_cache.rs | 6 +++--- src/store/errors.rs | 2 +- src/transaction/transaction.rs | 8 ++++---- tests/common/ctl.rs | 6 +++--- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index d69feedc..c5cd9b44 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -104,7 +104,7 @@ pub enum Error { #[error("{}", message)] InternalError { message: String }, #[error("{0}")] - OtherError(String), + StringError(String), #[error("PessimisticLock error: {:?}", inner)] PessimisticLockError { inner: Box, diff --git a/src/region_cache.rs b/src/region_cache.rs index b8d7ced7..8837de38 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -117,7 +117,7 @@ impl RegionCache { return self.read_through_region_by_id(id).await; } } - Err(Error::OtherError(format!( + Err(Error::StringError(format!( "Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times" ))) } @@ -316,7 +316,7 @@ mod test { .filter(|(_, r)| r.contains(&key.clone().into())) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) } async fn get_region_by_id( @@ -331,7 +331,7 @@ mod test { .filter(|(id, _)| id == &®ion_id) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) } async fn get_store( diff --git a/src/store/errors.rs b/src/store/errors.rs index eb9f623a..5deb3fea 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -169,7 +169,7 @@ impl HasKeyErrors for Result { Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)), Err(e) => Some(vec![std::mem::replace( e, - Error::OtherError("".to_string()), // placeholder, no use. + Error::StringError("".to_string()), // placeholder, no use. )]), } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e09811dc..a029eac3 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1271,7 +1271,7 @@ impl Committer { let min_commit_ts = self.prewrite().await?; fail_point!("after-prewrite", |_| { - Err(Error::OtherError( + Err(Error::StringError( "failpoint: after-prewrite return error".to_owned(), )) }); @@ -1420,7 +1420,7 @@ impl Committer { if primary_key != expired.key.as_ref() { error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}", self.start_version.version(), HexRepr(&expired.key), primary_key); - return Err(Error::OtherError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string())); + return Err(Error::StringError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string())); } // Do not retry for a txn which has a too large min_commit_ts. @@ -1432,7 +1432,7 @@ impl Committer { { let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}", expired.min_commit_ts, expired.attempted_commit_ts); - return Err(Error::OtherError(msg)); + return Err(Error::StringError(msg)); } continue; } else { @@ -1464,7 +1464,7 @@ impl Committer { let percent = percent.unwrap().parse::().unwrap(); new_len = mutations_len * percent / 100; if new_len == 0 { - Err(Error::OtherError( + Err(Error::StringError( "failpoint: before-commit-secondary return error".to_owned(), )) } else { diff --git a/tests/common/ctl.rs b/tests/common/ctl.rs index 32781405..092c32bb 100644 --- a/tests/common/ctl.rs +++ b/tests/common/ctl.rs @@ -10,16 +10,16 @@ use crate::common::Result; pub async fn get_region_count() -> Result { let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0])) .await - .map_err(|e| Error::OtherError(e.to_string()))?; + .map_err(|e| Error::StringError(e.to_string()))?; let body = res .text() .await - .map_err(|e| Error::OtherError(e.to_string()))?; + .map_err(|e| Error::StringError(e.to_string()))?; let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| { panic!("invalid body: {:?}, error: {:?}", body, err); }); value["count"] .as_u64() - .ok_or_else(|| Error::OtherError("pd region count does not return an integer".to_owned())) + .ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned())) }