Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub use key::KvPairTTL;
pub use kvpair::KvPair;
pub use value::Value;

struct HexRepr<'a>(pub &'a [u8]);
pub struct HexRepr<'a>(pub &'a [u8]);

impl fmt::Display for HexRepr<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down
10 changes: 6 additions & 4 deletions src/store/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt::Display;

use crate::proto::kvrpcpb;
use crate::Error;

Expand Down Expand Up @@ -164,11 +162,15 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
}
}

impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
fn key_errors(&mut self) -> Option<Vec<Error>> {
match self {
Ok(x) => x.key_errors(),
Err(e) => Some(vec![Error::StringError(e.to_string())]),
Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)),
Err(e) => Some(vec![std::mem::replace(
e,
Error::StringError("".to_string()), // placeholder, no use.
)]),
}
}
}
Expand Down
53 changes: 50 additions & 3 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use std::time::Instant;
use derive_new::new;
use fail::fail_point;
use futures::prelude::*;
use log::warn;
use log::{debug, trace};
use log::{debug, error, info, trace, warn};
use tokio::time::Duration;

use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::kv::HexRepr;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
Expand Down Expand Up @@ -1285,7 +1285,7 @@ impl<PdC: PdClient> Committer<PdC> {
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
min_commit_ts.unwrap()
} else {
match self.commit_primary().await {
match self.commit_primary_with_retry().await {
Ok(commit_ts) => commit_ts,
Err(e) => {
return if self.undetermined {
Expand Down Expand Up @@ -1388,6 +1388,11 @@ impl<PdC: PdClient> Committer<PdC> {
.plan();
plan.execute()
.inspect_err(|e| {
debug!(
"commit primary error: {:?}, start_ts: {}",
e,
self.start_version.version()
);
// We don't know whether the transaction is committed or not if we fail to receive
// the response. Then, we mark the transaction as undetermined and propagate the
// error to the user.
Expand All @@ -1400,6 +1405,48 @@ impl<PdC: PdClient> Committer<PdC> {
Ok(commit_version)
}

async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
loop {
match self.commit_primary().await {
Ok(commit_version) => return Ok(commit_version),
Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
Some(Error::KeyError(key_err)) => {
if let Some(expired) = key_err.commit_ts_expired {
// Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go
info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
self.start_version.version());

let primary_key = self.primary_key.as_ref().unwrap();
if primary_key != expired.key.as_ref() {
error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
self.start_version.version(), HexRepr(&expired.key), primary_key);
return Err(Error::StringError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
}

// Do not retry for a txn which has a too large min_commit_ts.
// 3600000 << 18 = 943718400000
if expired
.min_commit_ts
.saturating_sub(expired.attempted_commit_ts)
> 943718400000
{
let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
expired.min_commit_ts, expired.attempted_commit_ts);
return Err(Error::StringError(msg));
}
continue;
} else {
return Err(Error::KeyError(key_err));
}
}
Some(err) => return Err(err),
None => unreachable!(),
},
Err(err) => return Err(err),
}
}
}

async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
debug!("committing secondary");
let mutations_len = self.mutations.len();
Expand Down
4 changes: 3 additions & 1 deletion tests/common/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ pub async fn get_region_count() -> Result<u64> {
.text()
.await
.map_err(|e| Error::StringError(e.to_string()))?;
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap();
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| {
panic!("invalid body: {:?}, error: {:?}", body, err);
});
value["count"]
.as_u64()
.ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned()))
Expand Down