Skip to content
135 changes: 131 additions & 4 deletions src/request/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,32 @@ impl TruncateKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
}
}

impl EncodeKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
if !matches!(keyspace, Keyspace::Enable { .. }) {
return self;
}
for lock in &mut self {
take_mut::take(&mut lock.key, |key| {
Key::from(key).encode_keyspace(keyspace, key_mode).into()
});
take_mut::take(&mut lock.primary_lock, |primary| {
Key::from(primary)
.encode_keyspace(keyspace, key_mode)
.into()
});
for secondary in lock.secondaries.iter_mut() {
take_mut::take(secondary, |secondary| {
Key::from(secondary)
.encode_keyspace(keyspace, key_mode)
.into()
});
}
}
self
}
}

fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] {
let mut prefix = keyspace_id.to_be_bytes();
prefix[0] = match key_mode {
Expand Down Expand Up @@ -273,23 +299,103 @@ mod tests {
mutation.encode_keyspace(keyspace, key_mode),
expected_mutation
);

let key_mode = KeyMode::Txn;
let lock = crate::proto::kvrpcpb::LockInfo {
key: vec![b'k', b'1'],
primary_lock: vec![b'p', b'1'],
secondaries: vec![vec![b's', b'1'], vec![b's', b'2']],
..Default::default()
};
let locks = vec![lock].encode_keyspace(keyspace, key_mode);
assert_eq!(locks.len(), 1);
assert_eq!(locks[0].key, vec![b'x', 0, 0xDE, 0xAD, b'k', b'1']);
assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0xDE, 0xAD, b'p', b'1']);
assert_eq!(
locks[0].secondaries,
vec![
vec![b'x', 0, 0xDE, 0xAD, b's', b'1'],
vec![b'x', 0, 0xDE, 0xAD, b's', b'2']
]
);
}

#[test]
fn test_truncate_version() {
let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
let keyspace = Keyspace::Enable {
keyspace_id: 0xDEAD,
};

let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
let expected_key = Key::from(vec![0xBE, 0xEF]);
assert_eq!(key.truncate_keyspace(keyspace), expected_key);

let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
let keyspace = Keyspace::Enable {
keyspace_id: 0xDEAD,
};
let expected_key = Key::from(vec![0xBE, 0xEF]);
assert_eq!(key.truncate_keyspace(keyspace), expected_key);

let pair = KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']);
let expected_pair = KvPair(Key::from(vec![b'k']), vec![b'v']);
assert_eq!(pair.truncate_keyspace(keyspace), expected_pair);

let range = Range {
start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
};
let expected_range = Range {
start: Key::from(vec![b'a']),
end: Key::from(vec![b'b']),
};
assert_eq!(range.truncate_keyspace(keyspace), expected_range);

let ranges = vec![
Range {
start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
},
Range {
start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'c']),
end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'd']),
},
];
let expected_ranges = vec![
Range {
start: Key::from(vec![b'a']),
end: Key::from(vec![b'b']),
},
Range {
start: Key::from(vec![b'c']),
end: Key::from(vec![b'd']),
},
];
assert_eq!(ranges.truncate_keyspace(keyspace), expected_ranges);

let pairs = vec![
KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']),
KvPair(
Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k', b'2']),
vec![b'v', b'2'],
),
];
let expected_pairs = vec![
KvPair(Key::from(vec![b'k']), vec![b'v']),
KvPair(Key::from(vec![b'k', b'2']), vec![b'v', b'2']),
];
assert_eq!(pairs.truncate_keyspace(keyspace), expected_pairs);

let lock = crate::proto::kvrpcpb::LockInfo {
key: vec![b'x', 0, 0xDE, 0xAD, b'k'],
primary_lock: vec![b'x', 0, 0xDE, 0xAD, b'p'],
secondaries: vec![vec![b'x', 0, 0xDE, 0xAD, b's']],
..Default::default()
};
let expected_lock = crate::proto::kvrpcpb::LockInfo {
key: vec![b'k'],
primary_lock: vec![b'p'],
secondaries: vec![vec![b's']],
..Default::default()
};
assert_eq!(vec![lock].truncate_keyspace(keyspace), vec![expected_lock]);
}

#[test]
Expand Down Expand Up @@ -320,6 +426,27 @@ mod tests {
mutation.clone().encode_keyspace(keyspace, key_mode),
mutation
);

let lock = crate::proto::kvrpcpb::LockInfo {
key: vec![b'x', 0, 0, 0, b'k'],
primary_lock: vec![b'x', 0, 0, 0, b'p'],
secondaries: vec![vec![b'x', 0, 0, 0, b's']],
..Default::default()
};
let locks = vec![lock];
assert_eq!(locks.clone().encode_keyspace(keyspace, key_mode), locks);

let lock = crate::proto::kvrpcpb::LockInfo {
key: vec![b'k', b'1'],
primary_lock: vec![b'p', b'1'],
secondaries: vec![vec![b's', b'1']],
..Default::default()
};
let locks = vec![lock.clone()];
assert_eq!(
locks.clone().encode_keyspace(Keyspace::Disable, key_mode),
locks
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<Req: KvRequest + StoreRequest> StoreRequest for Dispatch<Req> {
const MULTI_REGION_CONCURRENCY: usize = 16;
const MULTI_STORES_CONCURRENCY: usize = 16;

fn is_grpc_error(e: &Error) -> bool {
pub(crate) fn is_grpc_error(e: &Error) -> bool {
matches!(e, Error::GrpcAPI(_) | Error::Grpc(_))
}

Expand Down
40 changes: 38 additions & 2 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::request::plan::CleanupLocksResult;
use crate::request::EncodeKeyspace;
Expand All @@ -19,6 +20,7 @@ use crate::timestamp::TimestampExt;
use crate::transaction::lock::ResolveLocksOptions;
use crate::transaction::lowering::new_scan_lock_request;
use crate::transaction::lowering::new_unsafe_destroy_range_request;
use crate::transaction::resolve_locks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::Snapshot;
use crate::transaction::Transaction;
Expand Down Expand Up @@ -294,9 +296,7 @@ impl Client {
plan.execute().await
}

// For test.
// Note: `batch_size` must be >= expected number of locks.
#[cfg(feature = "integration-tests")]
pub async fn scan_locks(
&self,
safepoint: &Timestamp,
Expand All @@ -314,6 +314,42 @@ impl Client {
Ok(plan.execute().await?.truncate_keyspace(self.keyspace))
}

/// Resolves the given locks and returns any that remain live.
///
/// This method retries until either all locks are resolved or the provided
/// `backoff` is exhausted. The `timestamp` is used as the caller start
/// timestamp when checking transaction status.
pub async fn resolve_locks(
&self,
locks: Vec<kvrpcpb::LockInfo>,
timestamp: Timestamp,
mut backoff: Backoff,
) -> Result<Vec<kvrpcpb::LockInfo>> {
use crate::request::TruncateKeyspace;

let mut live_locks = locks;
loop {
let resolved_locks = resolve_locks(
live_locks.encode_keyspace(self.keyspace, KeyMode::Txn),
timestamp.clone(),
self.pd.clone(),
self.keyspace,
)
.await?;
live_locks = resolved_locks.truncate_keyspace(self.keyspace);
if live_locks.is_empty() {
return Ok(live_locks);
}

match backoff.next_delay_duration() {
None => return Ok(live_locks),
Some(delay_duration) => {
tokio::time::sleep(delay_duration).await;
}
}
}
}

/// Cleans up all keys in a range and quickly reclaim disk space.
///
/// The range can span over multiple regions.
Expand Down
Loading