Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ export RUSTFLAGS=-Dwarnings

.PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all

PD_ADDRS ?= "127.0.0.1:2379"
MULTI_REGION ?= 1
export PD_ADDRS ?= 127.0.0.1:2379
export MULTI_REGION ?= 1

ALL_FEATURES := integration-tests

Expand Down
3 changes: 2 additions & 1 deletion src/common/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;

use log::debug;
use log::info;
use regex::Regex;
use tonic::transport::Certificate;
Expand Down Expand Up @@ -79,7 +80,7 @@ impl SecurityManager {
{
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");

info!("connect to rpc server at endpoint: {:?}", addr);
debug!("connect to rpc server at endpoint: {:?}", addr);

let mut builder = Channel::from_shared(addr)?
.tcp_keepalive(Some(Duration::from_secs(10)))
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ pub use crate::transaction::CheckLevel;
#[doc(inline)]
pub use crate::transaction::Client as TransactionClient;
#[doc(inline)]
pub use crate::transaction::CommitTTLParameters;
#[doc(inline)]
pub use crate::transaction::Snapshot;
#[doc(inline)]
pub use crate::transaction::Transaction;
Expand Down
4 changes: 2 additions & 2 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::prelude::*;
use futures::stream::BoxStream;
use log::info;
use log::debug;
use tokio::sync::RwLock;

use crate::compat::stream_fn;
Expand Down Expand Up @@ -347,7 +347,7 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, Kv
if let Some(client) = self.kv_client_cache.read().await.get(address) {
return Ok(client.clone());
};
info!("connect to tikv endpoint: {:?}", address);
debug!("connect to tikv endpoint: {:?}", address);
match self.kv_connect.connect(address).await {
Ok(client) => {
self.kv_client_cache
Expand Down
3 changes: 2 additions & 1 deletion src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use std::time::Instant;

use async_trait::async_trait;
use log::debug;
use log::error;
use log::info;
use log::warn;
Expand Down Expand Up @@ -183,7 +184,7 @@ impl Connection {

match members {
Some(members) => {
info!("All PD endpoints are consistent: {:?}", endpoints);
debug!("All PD endpoints are consistent: {:?}", endpoints);
Ok(members)
}
_ => Err(internal_err!("PD cluster failed to respond")),
Expand Down
3 changes: 1 addition & 2 deletions src/pd/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use futures::task::AtomicWaker;
use futures::task::Context;
use futures::task::Poll;
use log::debug;
use log::info;
use pin_project::pin_project;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -107,7 +106,7 @@ async fn run_tso(
sending_future_waker.wake();
}
// TODO: distinguish between unexpected stream termination and expected end of test
info!("TSO stream terminated");
debug!("TSO stream terminated");
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use crate::proto::kvrpcpb;
use crate::proto::kvrpcpb::ApiVersion;
use crate::proto::metapb;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::range_request;
use crate::request::plan::ResponseWithShard;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::DefaultProcessor;
use crate::request::KvRequest;
use crate::request::Merge;
use crate::request::Process;
use crate::request::RangeRequest;
use crate::request::Shardable;
use crate::request::SingleKey;
use crate::shardable_key;
Expand Down Expand Up @@ -227,6 +229,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
type Response = kvrpcpb::RawDeleteRangeResponse;
}

range_request!(kvrpcpb::RawDeleteRangeRequest);
shardable_range!(kvrpcpb::RawDeleteRangeRequest);

pub fn new_raw_scan_request(
Expand All @@ -250,6 +253,7 @@ impl KvRequest for kvrpcpb::RawScanRequest {
type Response = kvrpcpb::RawScanResponse;
}

range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan.
shardable_range!(kvrpcpb::RawScanRequest);

impl Merge<kvrpcpb::RawScanResponse> for Collect {
Expand Down
1 change: 1 addition & 0 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use self::plan_builder::SingleKey;
pub use self::shard::Batchable;
pub use self::shard::HasNextBatch;
pub use self::shard::NextBatch;
pub use self::shard::RangeRequest;
pub use self::shard::Shardable;
use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
Expand Down
41 changes: 39 additions & 2 deletions src/request/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::request::KvRequest;
use crate::request::Plan;
use crate::request::ResolveLock;
use crate::store::RegionStore;
use crate::store::Request;
use crate::Result;

macro_rules! impl_inner_shardable {
Expand Down Expand Up @@ -204,6 +205,32 @@ macro_rules! shardable_keys {
};
}

pub trait RangeRequest: Request {
fn is_reverse(&self) -> bool {
false
}
}

#[doc(hidden)]
#[macro_export]
macro_rules! range_request {
($type_: ty) => {
impl RangeRequest for $type_ {}
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! reversible_range_request {
($type_: ty) => {
impl RangeRequest for $type_ {
fn is_reverse(&self) -> bool {
self.reverse
}
}
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! shardable_range {
Expand All @@ -215,8 +242,13 @@ macro_rules! shardable_range {
&self,
pd_client: &Arc<impl $crate::pd::PdClient>,
) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> {
let start_key = self.start_key.clone().into();
let end_key = self.end_key.clone().into();
let mut start_key = self.start_key.clone().into();
let mut end_key = self.end_key.clone().into();
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
// Therefore, before fetching the regions from PD, it is necessary to swap the values of start_key and end_key.
if self.is_reverse() {
std::mem::swap(&mut start_key, &mut end_key);
}
$crate::store::store_stream_for_range((start_key, end_key), pd_client.clone())
}

Expand All @@ -227,8 +259,13 @@ macro_rules! shardable_range {
) -> $crate::Result<()> {
self.set_context(store.region_with_leader.context()?);

// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
// As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request.
self.start_key = shard.0.into();
self.end_key = shard.1.into();
if self.is_reverse() {
std::mem::swap(&mut self.start_key, &mut self.end_key);
}
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/store/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl KvConnect for TikvConnect {
self.security_mgr
.connect(address, TikvClient::new)
.await
.map(|c| KvRpcClient::new(c, self.timeout))
.map(|c| KvRpcClient::new(c.max_decoding_message_size(usize::MAX), self.timeout))
}
}

Expand Down
75 changes: 74 additions & 1 deletion src/transaction/client.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::ops::Bound;
use std::sync::Arc;

use log::debug;
use log::info;
use log::warn;

use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest};
use crate::request::plan::CleanupLocksResult;
use crate::request::Plan;
use crate::timestamp::TimestampExt;
use crate::transaction::lock::cleanup_locks;
use crate::transaction::lock::ResolveLocksOptions;
use crate::transaction::lowering::new_scan_lock_request;
use crate::transaction::lowering::new_unsafe_destroy_range_request;
Expand All @@ -26,7 +30,7 @@ use crate::BoundRange;
use crate::Result;

// FIXME: cargo-culted value
const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
const SCAN_LOCK_BATCH_SIZE: u32 = 8192;

/// The TiKV transactional `Client` is used to interact with TiKV using transactional requests.
///
Expand Down Expand Up @@ -290,6 +294,75 @@ impl<Cod: Codec> Client<Cod> {
plan.execute().await
}

/// GC function from older version of library:
/// https://github.com/tikv/client-rust/blob/9ca9aa79c6e28a878e9ee9574fd96bbc2688ccea/src/transaction/client.rs
///
/// Cleans stale MVCC records in TiKV.
///
/// It is done by:
/// 1. resolve all locks with ts <= safepoint
/// 2. update safepoint to PD
///
/// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
/// We omit the second step "delete ranges" which is an optimization for TiDB.
pub async fn legacy_gc(&self, safepoint: Timestamp, cleanup_locks: bool) -> Result<bool> {
if cleanup_locks {
let resolved = self.legacy_cleanup_locks((..).into(), &safepoint).await?;

info!("resolved {resolved} locks, sending new safepoint to PD...");
}

// update safepoint to PD
let res: bool = self
.pd
.clone()
.update_safepoint(safepoint.version())
.await?;
if !res {
warn!("new safepoint != user-specified safepoint");
}

Ok(res)
}

pub async fn legacy_cleanup_locks(
&self,
mut range: BoundRange,
safepoint: &Timestamp,
) -> Result<usize> {
let mut total_resolved = 0;

loop {
let req = new_scan_lock_request(range.clone(), &safepoint, SCAN_LOCK_BATCH_SIZE);

let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(crate::request::Collect)
.plan();
let res: Vec<kvrpcpb::LockInfo> = plan.execute().await?;

if res.is_empty() {
break;
}

let mut start_key = res.last().unwrap().key.clone();
start_key.push(0);

range.from = Bound::Included(start_key.into());

debug!("scanned {} keys, new range: {:?}", res.len(), range);

let to_resolve = res.len();

cleanup_locks(res, self.pd.clone()).await?;

total_resolved += to_resolve;
}

Ok(total_resolved)
}

// For test.
// Note: `batch_size` must be >= expected number of locks.
#[cfg(feature = "integration-tests")]
Expand Down
58 changes: 58 additions & 0 deletions src/transaction/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,64 @@ pub async fn resolve_locks(
Ok(live_locks)
}

/// Resolve all locks, regardless of if they are expired or not
pub async fn cleanup_locks(
locks: Vec<kvrpcpb::LockInfo>,
pd_client: Arc<impl PdClient>,
) -> Result<()> {
debug!("cleaning up locks");
let expired_locks = locks; // assume all expired

// records the commit version of each primary lock (representing the status of the transaction)
let mut commit_versions: HashMap<u64, u64> = HashMap::new();
let mut clean_regions: HashMap<u64, HashSet<RegionVerId>> = HashMap::new();
for lock in expired_locks {
let region_ver_id = pd_client
.region_for_key(&lock.primary_lock.clone().into())
.await?
.ver_id();
// skip if the region is cleaned
if clean_regions
.get(&lock.lock_version)
.map(|regions| regions.contains(&region_ver_id))
.unwrap_or(false)
{
continue;
}

let commit_version = match commit_versions.get(&lock.lock_version) {
Some(&commit_version) => commit_version,
None => {
let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version);
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
let commit_version = plan.execute().await?;
commit_versions.insert(lock.lock_version, commit_version);
commit_version
}
};

let cleaned_region = resolve_lock_with_retry(
&lock.key,
lock.lock_version,
commit_version,
pd_client.clone(),
)
.await?;
clean_regions
.entry(lock.lock_version)
.or_default()
.insert(cleaned_region);
}

Ok(())
}

async fn resolve_lock_with_retry(
#[allow(clippy::ptr_arg)] key: &Vec<u8>,
start_version: u64,
Expand Down
1 change: 1 addition & 0 deletions src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) use lock::resolve_locks;
pub(crate) use lock::HasLocks;
pub use snapshot::Snapshot;
pub use transaction::CheckLevel;
pub use transaction::CommitTTLParameters;
#[doc(hidden)]
pub use transaction::HeartbeatOption;
pub use transaction::Transaction;
Expand Down
Loading