diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5f855459..2e08e600 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,7 +60,7 @@ jobs: env: CARGO_INCREMENTAL: 0 NEXTEST_PROFILE: ci - TIKV_VERSION: v8.5.1 + TIKV_VERSION: v8.5.5 RUST_LOG: info runs-on: ubuntu-latest steps: diff --git a/Cargo.toml b/Cargo.toml index 7e8e45f1..03d0198d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ prometheus = ["prometheus/push", "prometheus/process"] # Enable integration tests with a running TiKV and PD instance. # Use $PD_ADDRS, comma separated, to set the addresses the tests use. integration-tests = [] +apiv2-no-prefix = [] [lib] name = "tikv_client" diff --git a/Makefile b/Makefile index ef3fd397..6e22ead3 100644 --- a/Makefile +++ b/Makefile @@ -5,11 +5,11 @@ export RUSTFLAGS=-Dwarnings export PD_ADDRS ?= 127.0.0.1:2379 export MULTI_REGION ?= 1 -ALL_FEATURES := integration-tests +ALL_FEATURES := integration-tests apiv2-no-prefix NEXTEST_ARGS := --config-file $(shell pwd)/config/nextest.toml -INTEGRATION_TEST_ARGS := --features "integration-tests" --test-threads 1 +INTEGRATION_TEST_ARGS := --features "integration-tests apiv2-no-prefix" --test-threads 1 RUN_INTEGRATION_TEST := cargo nextest run ${NEXTEST_ARGS} --all ${INTEGRATION_TEST_ARGS} diff --git a/src/raw/client.rs b/src/raw/client.rs index 620531ee..f8991e48 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -112,8 +112,8 @@ impl Client { let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?); let keyspace = match config.keyspace { - Some(keyspace) => { - let keyspace = rpc.load_keyspace(&keyspace).await?; + Some(name) => { + let keyspace = rpc.load_keyspace(&name).await?; Keyspace::Enable { keyspace_id: keyspace.id, } diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs index 118e6fbf..94fd5f76 100644 --- a/src/request/keyspace.rs +++ b/src/request/keyspace.rs @@ -15,7 +15,16 @@ pub const KEYSPACE_PREFIX_LEN: usize = 4; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum Keyspace { Disable, - Enable { keyspace_id: u32 }, + Enable { + keyspace_id: u32, + }, + /// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix. + /// + /// This mode is intended for **server-side embedding** use cases (e.g. embedding this client in + /// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed + /// through unchanged. + #[cfg(feature = "apiv2-no-prefix")] + ApiV2NoPrefix, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -29,6 +38,8 @@ impl Keyspace { match self { Keyspace::Disable => kvrpcpb::ApiVersion::V1, Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2, + #[cfg(feature = "apiv2-no-prefix")] + Keyspace::ApiV2NoPrefix => kvrpcpb::ApiVersion::V2, } } } @@ -43,12 +54,10 @@ pub trait TruncateKeyspace { impl EncodeKeyspace for Key { fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { - let prefix = match keyspace { - Keyspace::Disable => { - return self; - } - Keyspace::Enable { keyspace_id } => keyspace_prefix(keyspace_id, key_mode), + let Keyspace::Enable { keyspace_id } = keyspace else { + return self; }; + let prefix = keyspace_prefix(keyspace_id, key_mode); prepend_bytes(&mut self.0, &prefix); @@ -68,11 +77,9 @@ impl EncodeKeyspace for BoundRange { self.from = match self.from { Bound::Included(key) => Bound::Included(key.encode_keyspace(keyspace, key_mode)), Bound::Excluded(key) => Bound::Excluded(key.encode_keyspace(keyspace, key_mode)), - Bound::Unbounded => { - let key = Key::from(vec![]); - Bound::Included(key.encode_keyspace(keyspace, key_mode)) - } + Bound::Unbounded => Bound::Included(Key::EMPTY.encode_keyspace(keyspace, key_mode)), }; + self.to = match self.to { Bound::Included(key) if !key.is_empty() => { Bound::Included(key.encode_keyspace(keyspace, key_mode)) @@ -80,16 +87,15 @@ impl EncodeKeyspace for BoundRange { Bound::Excluded(key) if !key.is_empty() => { Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) } - _ => { - let key = Key::from(vec![]); - let keyspace = match keyspace { - Keyspace::Disable => Keyspace::Disable, - Keyspace::Enable { keyspace_id } => Keyspace::Enable { + _ => match keyspace { + Keyspace::Enable { keyspace_id } => Bound::Excluded(Key::EMPTY.encode_keyspace( + Keyspace::Enable { keyspace_id: keyspace_id + 1, }, - }; - Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) - } + key_mode, + )), + _ => Bound::Excluded(Key::EMPTY), + }, }; self } @@ -106,7 +112,7 @@ impl EncodeKeyspace for Mutation { impl TruncateKeyspace for Key { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { - if let Keyspace::Disable = keyspace { + if !matches!(keyspace, Keyspace::Enable { .. }) { return self; } @@ -133,6 +139,9 @@ impl TruncateKeyspace for Range { impl TruncateKeyspace for Vec> { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } for range in &mut self { take_mut::take(range, |range| range.truncate_keyspace(keyspace)); } @@ -142,6 +151,9 @@ impl TruncateKeyspace for Vec> { impl TruncateKeyspace for Vec { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } for pair in &mut self { take_mut::take(pair, |pair| pair.truncate_keyspace(keyspace)); } @@ -151,6 +163,9 @@ impl TruncateKeyspace for Vec { impl TruncateKeyspace for Vec { fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if !matches!(keyspace, Keyspace::Enable { .. }) { + return self; + } for lock in &mut self { take_mut::take(&mut lock.key, |key| { Key::from(key).truncate_keyspace(keyspace).into() @@ -277,4 +292,66 @@ mod tests { let expected_key = Key::from(vec![0xBE, 0xEF]); assert_eq!(key.truncate_keyspace(keyspace), expected_key); } + + #[cfg(feature = "apiv2-no-prefix")] + #[test] + fn test_apiv2_no_prefix_api_version() { + assert_eq!( + Keyspace::ApiV2NoPrefix.api_version(), + kvrpcpb::ApiVersion::V2 + ); + } + + #[cfg(feature = "apiv2-no-prefix")] + #[test] + fn test_apiv2_no_prefix_encode_is_noop() { + let keyspace = Keyspace::ApiV2NoPrefix; + let key_mode = KeyMode::Txn; + + let key = Key::from(vec![b'x', 0, 0, 0, b'k']); + assert_eq!(key.clone().encode_keyspace(keyspace, key_mode), key); + + let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']); + assert_eq!(pair.clone().encode_keyspace(keyspace, key_mode), pair); + + let bound: BoundRange = + (Key::from(vec![b'x', 0, 0, 0, b'a'])..Key::from(vec![b'x', 0, 0, 0, b'b'])).into(); + assert_eq!(bound.clone().encode_keyspace(keyspace, key_mode), bound); + + let mutation = Mutation::Put(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![1, 2, 3]); + assert_eq!( + mutation.clone().encode_keyspace(keyspace, key_mode), + mutation + ); + } + + #[cfg(feature = "apiv2-no-prefix")] + #[test] + fn test_apiv2_no_prefix_truncate_is_noop() { + let keyspace = Keyspace::ApiV2NoPrefix; + + let key = Key::from(vec![b'x', 0, 0, 0, b'k']); + assert_eq!(key.clone().truncate_keyspace(keyspace), key); + + let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']); + assert_eq!(pair.clone().truncate_keyspace(keyspace), pair); + + let range = Range { + start: Key::from(vec![b'x', 0, 0, 0, b'a']), + end: Key::from(vec![b'x', 0, 0, 0, b'b']), + }; + assert_eq!(range.clone().truncate_keyspace(keyspace), range); + + let pairs = vec![pair]; + assert_eq!(pairs.clone().truncate_keyspace(keyspace), pairs); + + let lock = crate::proto::kvrpcpb::LockInfo { + key: vec![b'x', 0, 0, 0, b'k'], + primary_lock: vec![b'x', 0, 0, 0, b'p'], + secondaries: vec![vec![b'x', 0, 0, 0, b's']], + ..Default::default() + }; + let locks = vec![lock]; + assert_eq!(locks.clone().truncate_keyspace(keyspace), locks); + } } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index a9bb2269..22873b0c 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -110,8 +110,8 @@ impl Client { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?); let keyspace = match config.keyspace { - Some(keyspace) => { - let keyspace = pd.load_keyspace(&keyspace).await?; + Some(name) => { + let keyspace = pd.load_keyspace(&name).await?; Keyspace::Enable { keyspace_id: keyspace.id, } @@ -121,6 +121,30 @@ impl Client { Ok(Client { pd, keyspace }) } + /// Create a transactional [`Client`] that uses API V2 without adding or removing any API V2 + /// keyspace/key-mode prefix, with a custom configuration. + /// + /// This is intended for **server-side embedding** use cases. `config.keyspace` must be unset. + #[cfg(feature = "apiv2-no-prefix")] + pub async fn new_with_config_api_v2_no_prefix>( + pd_endpoints: Vec, + config: Config, + ) -> Result { + if config.keyspace.is_some() { + return Err(crate::Error::StringError( + "config.keyspace must be unset when using api-v2-no-prefix mode".to_owned(), + )); + } + + debug!("creating new transactional client (api-v2-no-prefix)"); + let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); + let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?); + Ok(Client { + pd, + keyspace: Keyspace::ApiV2NoPrefix, + }) + } + /// Creates a new optimistic [`Transaction`]. /// /// Use the transaction to issue requests like [`get`](Transaction::get) or