Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
env:
CARGO_INCREMENTAL: 0
NEXTEST_PROFILE: ci
TIKV_VERSION: v8.5.1
TIKV_VERSION: v8.5.5
RUST_LOG: info
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ prometheus = ["prometheus/push", "prometheus/process"]
# Enable integration tests with a running TiKV and PD instance.
# Use $PD_ADDRS, comma separated, to set the addresses the tests use.
integration-tests = []
apiv2-no-prefix = []

[lib]
name = "tikv_client"
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ export RUSTFLAGS=-Dwarnings
export PD_ADDRS ?= 127.0.0.1:2379
export MULTI_REGION ?= 1

ALL_FEATURES := integration-tests
ALL_FEATURES := integration-tests apiv2-no-prefix

NEXTEST_ARGS := --config-file $(shell pwd)/config/nextest.toml

INTEGRATION_TEST_ARGS := --features "integration-tests" --test-threads 1
INTEGRATION_TEST_ARGS := --features "integration-tests apiv2-no-prefix" --test-threads 1

RUN_INTEGRATION_TEST := cargo nextest run ${NEXTEST_ARGS} --all ${INTEGRATION_TEST_ARGS}

Expand Down
4 changes: 2 additions & 2 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ impl Client<PdRpcClient> {
let rpc =
Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?);
let keyspace = match config.keyspace {
Some(keyspace) => {
let keyspace = rpc.load_keyspace(&keyspace).await?;
Some(name) => {
let keyspace = rpc.load_keyspace(&name).await?;
Keyspace::Enable {
keyspace_id: keyspace.id,
}
Expand Down
115 changes: 96 additions & 19 deletions src/request/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ pub const KEYSPACE_PREFIX_LEN: usize = 4;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Keyspace {
Disable,
Enable { keyspace_id: u32 },
Enable {
keyspace_id: u32,
},
/// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix.
///
/// This mode is intended for **server-side embedding** use cases (e.g. embedding this client in
/// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed
/// through unchanged.
#[cfg(feature = "apiv2-no-prefix")]
ApiV2NoPrefix,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand All @@ -29,6 +38,8 @@ impl Keyspace {
match self {
Keyspace::Disable => kvrpcpb::ApiVersion::V1,
Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2,
#[cfg(feature = "apiv2-no-prefix")]
Keyspace::ApiV2NoPrefix => kvrpcpb::ApiVersion::V2,
}
}
}
Expand All @@ -43,12 +54,10 @@ pub trait TruncateKeyspace {

impl EncodeKeyspace for Key {
fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
let prefix = match keyspace {
Keyspace::Disable => {
return self;
}
Keyspace::Enable { keyspace_id } => keyspace_prefix(keyspace_id, key_mode),
let Keyspace::Enable { keyspace_id } = keyspace else {
return self;
};
let prefix = keyspace_prefix(keyspace_id, key_mode);

prepend_bytes(&mut self.0, &prefix);

Expand All @@ -68,28 +77,25 @@ impl EncodeKeyspace for BoundRange {
self.from = match self.from {
Bound::Included(key) => Bound::Included(key.encode_keyspace(keyspace, key_mode)),
Bound::Excluded(key) => Bound::Excluded(key.encode_keyspace(keyspace, key_mode)),
Bound::Unbounded => {
let key = Key::from(vec![]);
Bound::Included(key.encode_keyspace(keyspace, key_mode))
}
Bound::Unbounded => Bound::Included(Key::EMPTY.encode_keyspace(keyspace, key_mode)),
};

self.to = match self.to {
Bound::Included(key) if !key.is_empty() => {
Bound::Included(key.encode_keyspace(keyspace, key_mode))
}
Bound::Excluded(key) if !key.is_empty() => {
Bound::Excluded(key.encode_keyspace(keyspace, key_mode))
}
_ => {
let key = Key::from(vec![]);
let keyspace = match keyspace {
Keyspace::Disable => Keyspace::Disable,
Keyspace::Enable { keyspace_id } => Keyspace::Enable {
_ => match keyspace {
Keyspace::Enable { keyspace_id } => Bound::Excluded(Key::EMPTY.encode_keyspace(
Keyspace::Enable {
keyspace_id: keyspace_id + 1,
},
};
Bound::Excluded(key.encode_keyspace(keyspace, key_mode))
}
key_mode,
)),
_ => Bound::Excluded(Key::EMPTY),
},
};
self
}
Expand All @@ -106,7 +112,7 @@ impl EncodeKeyspace for Mutation {

impl TruncateKeyspace for Key {
fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
if let Keyspace::Disable = keyspace {
if !matches!(keyspace, Keyspace::Enable { .. }) {
return self;
}

Expand All @@ -133,6 +139,9 @@ impl TruncateKeyspace for Range<Key> {

impl TruncateKeyspace for Vec<Range<Key>> {
fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
if !matches!(keyspace, Keyspace::Enable { .. }) {
return self;
}
for range in &mut self {
take_mut::take(range, |range| range.truncate_keyspace(keyspace));
}
Expand All @@ -142,6 +151,9 @@ impl TruncateKeyspace for Vec<Range<Key>> {

impl TruncateKeyspace for Vec<KvPair> {
fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
if !matches!(keyspace, Keyspace::Enable { .. }) {
return self;
}
for pair in &mut self {
take_mut::take(pair, |pair| pair.truncate_keyspace(keyspace));
}
Expand All @@ -151,6 +163,9 @@ impl TruncateKeyspace for Vec<KvPair> {

impl TruncateKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
if !matches!(keyspace, Keyspace::Enable { .. }) {
return self;
}
for lock in &mut self {
take_mut::take(&mut lock.key, |key| {
Key::from(key).truncate_keyspace(keyspace).into()
Expand Down Expand Up @@ -277,4 +292,66 @@ mod tests {
let expected_key = Key::from(vec![0xBE, 0xEF]);
assert_eq!(key.truncate_keyspace(keyspace), expected_key);
}

#[cfg(feature = "apiv2-no-prefix")]
#[test]
fn test_apiv2_no_prefix_api_version() {
assert_eq!(
Keyspace::ApiV2NoPrefix.api_version(),
kvrpcpb::ApiVersion::V2
);
}

#[cfg(feature = "apiv2-no-prefix")]
#[test]
fn test_apiv2_no_prefix_encode_is_noop() {
let keyspace = Keyspace::ApiV2NoPrefix;
let key_mode = KeyMode::Txn;

let key = Key::from(vec![b'x', 0, 0, 0, b'k']);
assert_eq!(key.clone().encode_keyspace(keyspace, key_mode), key);

let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']);
assert_eq!(pair.clone().encode_keyspace(keyspace, key_mode), pair);

let bound: BoundRange =
(Key::from(vec![b'x', 0, 0, 0, b'a'])..Key::from(vec![b'x', 0, 0, 0, b'b'])).into();
assert_eq!(bound.clone().encode_keyspace(keyspace, key_mode), bound);

let mutation = Mutation::Put(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![1, 2, 3]);
assert_eq!(
mutation.clone().encode_keyspace(keyspace, key_mode),
mutation
);
}

#[cfg(feature = "apiv2-no-prefix")]
#[test]
fn test_apiv2_no_prefix_truncate_is_noop() {
let keyspace = Keyspace::ApiV2NoPrefix;

let key = Key::from(vec![b'x', 0, 0, 0, b'k']);
assert_eq!(key.clone().truncate_keyspace(keyspace), key);

let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']);
assert_eq!(pair.clone().truncate_keyspace(keyspace), pair);

let range = Range {
start: Key::from(vec![b'x', 0, 0, 0, b'a']),
end: Key::from(vec![b'x', 0, 0, 0, b'b']),
};
assert_eq!(range.clone().truncate_keyspace(keyspace), range);

let pairs = vec![pair];
assert_eq!(pairs.clone().truncate_keyspace(keyspace), pairs);

let lock = crate::proto::kvrpcpb::LockInfo {
key: vec![b'x', 0, 0, 0, b'k'],
primary_lock: vec![b'x', 0, 0, 0, b'p'],
secondaries: vec![vec![b'x', 0, 0, 0, b's']],
..Default::default()
};
let locks = vec![lock];
assert_eq!(locks.clone().truncate_keyspace(keyspace), locks);
}
}
28 changes: 26 additions & 2 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ impl Client {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
let keyspace = match config.keyspace {
Some(keyspace) => {
let keyspace = pd.load_keyspace(&keyspace).await?;
Some(name) => {
let keyspace = pd.load_keyspace(&name).await?;
Keyspace::Enable {
keyspace_id: keyspace.id,
}
Expand All @@ -121,6 +121,30 @@ impl Client {
Ok(Client { pd, keyspace })
}

/// Create a transactional [`Client`] that uses API V2 without adding or removing any API V2
/// keyspace/key-mode prefix, with a custom configuration.
///
/// This is intended for **server-side embedding** use cases. `config.keyspace` must be unset.
#[cfg(feature = "apiv2-no-prefix")]
pub async fn new_with_config_api_v2_no_prefix<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client> {
if config.keyspace.is_some() {
return Err(crate::Error::StringError(
"config.keyspace must be unset when using api-v2-no-prefix mode".to_owned(),
));
}

debug!("creating new transactional client (api-v2-no-prefix)");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
Ok(Client {
pd,
keyspace: Keyspace::ApiV2NoPrefix,
})
}

/// Creates a new optimistic [`Transaction`].
///
/// Use the transaction to issue requests like [`get`](Transaction::get) or
Expand Down