Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tikv-client"
version = "0.3.0"
version = "0.4.0"
keywords = ["TiKV", "KV", "distributed-systems"]
license = "Apache-2.0"
authors = ["The TiKV Project Authors"]
Expand Down Expand Up @@ -46,7 +46,7 @@ serde_json = "1"
take_mut = "0.2.2"
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = { version = "0.10", features = ["tls"] }
tonic = { version = "0.10", features = ["tls", "gzip"] }

[dev-dependencies]
clap = "2"
Expand Down
3 changes: 3 additions & 0 deletions config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ max-open-files = 10000
api-version = 2
enable-ttl = true
reserve-space = "0MiB"

[server]
grpc-compression-type = "gzip"
15 changes: 15 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,25 @@ use serde_derive::Serialize;
///
/// See also [`TransactionOptions`](crate::TransactionOptions) which provides more ways to configure
/// requests.
///
/// This struct is marked `#[non_exhaustive]` to allow adding new configuration options in the
/// future without breaking downstream code. Construct it via [`Config::default`] and then use the
/// `with_*` methods (or field assignment) to customize it.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
#[non_exhaustive]
pub struct Config {
pub ca_path: Option<PathBuf>,
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub grpc_max_decoding_message_size: usize,
pub keyspace: Option<String>,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
const DEFAULT_GRPC_MAX_DECODING_MESSAGE_SIZE: usize = 4 * 1024 * 1024; // 4MB

impl Default for Config {
fn default() -> Self {
Expand All @@ -31,6 +38,7 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
grpc_max_decoding_message_size: DEFAULT_GRPC_MAX_DECODING_MESSAGE_SIZE,
keyspace: None,
}
}
Expand Down Expand Up @@ -86,6 +94,13 @@ impl Config {
self
}

/// Set the maximum decoding message size for gRPC.
#[must_use]
pub fn with_grpc_max_decoding_message_size(mut self, size: usize) -> Self {
self.grpc_max_decoding_message_size = size;
self
}

/// Set to use default keyspace.
///
/// Server should enable `storage.api-version = 2` to use this feature.
Expand Down
8 changes: 7 additions & 1 deletion src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,13 @@ impl PdRpcClient<TikvConnect, Cluster> {
) -> Result<PdRpcClient> {
PdRpcClient::new(
config.clone(),
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|security_mgr| {
TikvConnect::new(
security_mgr,
config.timeout,
config.grpc_max_decoding_message_size,
)
},
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
enable_codec,
)
Expand Down
8 changes: 7 additions & 1 deletion src/store/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;

use async_trait::async_trait;
use derive_new::new;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use super::Request;
Expand All @@ -25,6 +26,7 @@ pub trait KvConnect: Sized + Send + Sync + 'static {
pub struct TikvConnect {
security_mgr: Arc<SecurityManager>,
timeout: Duration,
grpc_max_decoding_message_size: usize,
}

#[async_trait]
Expand All @@ -33,7 +35,11 @@ impl KvConnect for TikvConnect {

async fn connect(&self, address: &str) -> Result<KvRpcClient> {
self.security_mgr
.connect(address, TikvClient::new)
.connect(address, move |channel| {
TikvClient::new(channel)
.max_decoding_message_size(self.grpc_max_decoding_message_size)
.accept_compressed(CompressionEncoding::Gzip)
})
.await
.map(|c| KvRpcClient::new(c, self.timeout))
}
Expand Down