From 6aa0a37a3ea948e4389792af5c6a328a3a095bd9 Mon Sep 17 00:00:00 2001 From: Zonglei Dong Date: Sun, 10 Jul 2022 22:27:40 +0800 Subject: [PATCH 1/7] Feat(config_center): add config center module. --- Cargo.toml | 1 + config_center/Cargo.toml | 8 ++++ config_center/src/URL.rs | 38 +++++++++++++++++ config_center/src/config_changed_event.rs | 35 ++++++++++++++++ config_center/src/configuration_listener.rs | 23 ++++++++++ config_center/src/dynamic_configuration.rs | 36 ++++++++++++++++ .../src/dynamic_configuration_factory.rs | 23 ++++++++++ .../src/etcd_dynamic_configuration.rs | 42 +++++++++++++++++++ config_center/src/lib.rs | 32 ++++++++++++++ 9 files changed, 238 insertions(+) create mode 100644 config_center/Cargo.toml create mode 100644 config_center/src/URL.rs create mode 100644 config_center/src/config_changed_event.rs create mode 100644 config_center/src/configuration_listener.rs create mode 100644 config_center/src/dynamic_configuration.rs create mode 100644 config_center/src/dynamic_configuration_factory.rs create mode 100644 config_center/src/etcd_dynamic_configuration.rs create mode 100644 config_center/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 970aeb16..46d45d5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,6 @@ members = [ "metadata", "common", "config", + "config_center", "dubbo" ] diff --git a/config_center/Cargo.toml b/config_center/Cargo.toml new file mode 100644 index 00000000..46f0da67 --- /dev/null +++ b/config_center/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "config_center" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/config_center/src/URL.rs b/config_center/src/URL.rs new file mode 100644 index 00000000..cb685ce0 --- /dev/null +++ b/config_center/src/URL.rs @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; + +pub struct URL { + parameters: HashMap, +} + +impl URL { + + pub fn get_parameter(&self, key: String, defaultValue: String) -> String { + let value = match self.parameters.get(key.as_str()) { + Some(value) => value.clone(), + None => { + defaultValue.clone() + }, + }; + if value.is_empty() { + return defaultValue; + } + value + } +} \ No newline at end of file diff --git a/config_center/src/config_changed_event.rs b/config_center/src/config_changed_event.rs new file mode 100644 index 00000000..2f614431 --- /dev/null +++ b/config_center/src/config_changed_event.rs @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[derive(Debug)] +pub struct ConfigChangedEvent { + + pub key: String, + + pub group: String, + + pub content: String, + + pub changeType: ConfigChangeType, +} + +#[derive(Debug)] +pub enum ConfigChangeType { + ADDED, + MODIFIED, + DELETED, +} \ No newline at end of file diff --git a/config_center/src/configuration_listener.rs b/config_center/src/configuration_listener.rs new file mode 100644 index 00000000..4c37fa38 --- /dev/null +++ b/config_center/src/configuration_listener.rs @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::config_changed_event::ConfigChangedEvent; + +pub trait ConfigurationListener { + + fn process(event: ConfigChangedEvent); +} \ No newline at end of file diff --git a/config_center/src/dynamic_configuration.rs b/config_center/src/dynamic_configuration.rs new file mode 100644 index 00000000..12148e1b --- /dev/null +++ b/config_center/src/dynamic_configuration.rs @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; +use crate::configuration_listener::ConfigurationListener; + +pub trait DynamicConfiguration { + + fn add_listener(key: String, listener: impl ConfigurationListener); + + fn remove_listener(key: String, listener: impl ConfigurationListener); + + // TODO how to override + + fn get_config(key: String, group: String, timeout: i32) -> String; + + fn get_properties(key: String, group: String, timeout: i32) -> String; + + fn publish_config(key: String, group: String, content: String) -> bool; + + fn get_config_keys(group: String) -> HashMap; +} \ No newline at end of file diff --git a/config_center/src/dynamic_configuration_factory.rs b/config_center/src/dynamic_configuration_factory.rs new file mode 100644 index 00000000..91ee4cc3 --- /dev/null +++ b/config_center/src/dynamic_configuration_factory.rs @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::URL::URL; + +pub trait DynamicConfigurationFactory { + + fn get_dynamic_configuration(&self, url: URL) -> T; +} \ No newline at end of file diff --git a/config_center/src/etcd_dynamic_configuration.rs b/config_center/src/etcd_dynamic_configuration.rs new file mode 100644 index 00000000..d96301a5 --- /dev/null +++ b/config_center/src/etcd_dynamic_configuration.rs @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::URL::URL; + +pub struct EtcdDynamicConfiguration { + + /** + * The final root path would be: /$NAME_SPACE/config + */ + pub rootPath: String, +} + +const CONFIG_NAMESPACE_KEY: &str = "namespace"; + +const DEFAULT_GROUP: &str = "dubbo"; + +impl EtcdDynamicConfiguration { + + pub fn new(&self, url: URL) -> Self { + let mut rootPath = String::from("/"); + rootPath.push_str(url.get_parameter(String::from(CONFIG_NAMESPACE_KEY), String::from(DEFAULT_GROUP)).as_str()); + rootPath.push_str("/config"); + EtcdDynamicConfiguration { + rootPath, + } + } +} \ No newline at end of file diff --git a/config_center/src/lib.rs b/config_center/src/lib.rs new file mode 100644 index 00000000..6151823b --- /dev/null +++ b/config_center/src/lib.rs @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod dynamic_configuration_factory; +mod configuration_listener; +mod config_changed_event; +mod dynamic_configuration; +mod URL; +mod etcd_dynamic_configuration; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + let result = 2 + 2; + assert_eq!(result, 4); + } +} From 51db8a3b28d5e46106fa6eca7c220ab5008df3d8 Mon Sep 17 00:00:00 2001 From: Zonglei Dong Date: Mon, 11 Jul 2022 11:00:51 +0800 Subject: [PATCH 2/7] Feat(config_center): add config center etcd client. --- config_center/Cargo.toml | 3 + config_center/src/config_changed_event.rs | 2 +- config_center/src/dynamic_configuration.rs | 14 +++-- .../src/dynamic_configuration_factory.rs | 2 +- .../src/etcd_dynamic_configuration.rs | 57 ++++++++++++++++--- config_center/src/lib.rs | 2 +- config_center/src/{URL.rs => url.rs} | 6 +- 7 files changed, 67 insertions(+), 19 deletions(-) rename config_center/src/{URL.rs => url.rs} (88%) diff --git a/config_center/Cargo.toml b/config_center/Cargo.toml index 46f0da67..3cb1cf2e 100644 --- a/config_center/Cargo.toml +++ b/config_center/Cargo.toml @@ -6,3 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +etcd-client = "0.9.2" +tokio = { version = "1.0", features = ["full"] } +async-trait = "0.1.56" diff --git a/config_center/src/config_changed_event.rs b/config_center/src/config_changed_event.rs index 2f614431..1e6bb6b3 100644 --- a/config_center/src/config_changed_event.rs +++ b/config_center/src/config_changed_event.rs @@ -24,7 +24,7 @@ pub struct ConfigChangedEvent { pub content: String, - pub changeType: ConfigChangeType, + pub change_type: ConfigChangeType, } #[derive(Debug)] diff --git a/config_center/src/dynamic_configuration.rs b/config_center/src/dynamic_configuration.rs index 12148e1b..1702a843 100644 --- a/config_center/src/dynamic_configuration.rs +++ b/config_center/src/dynamic_configuration.rs @@ -17,20 +17,22 @@ use std::collections::HashMap; use crate::configuration_listener::ConfigurationListener; +use async_trait::async_trait; +#[async_trait] pub trait DynamicConfiguration { - fn add_listener(key: String, listener: impl ConfigurationListener); + fn add_listener(&self, key: String, listener: impl ConfigurationListener); - fn remove_listener(key: String, listener: impl ConfigurationListener); + fn remove_listener(&self, key: String, listener: impl ConfigurationListener); // TODO how to override - fn get_config(key: String, group: String, timeout: i32) -> String; + async fn get_config(&mut self, key: String, group: String, timeout: i32) -> String; - fn get_properties(key: String, group: String, timeout: i32) -> String; + fn get_properties(&self, key: String, group: String, timeout: i32) -> String; - fn publish_config(key: String, group: String, content: String) -> bool; + fn publish_config(&self, key: String, group: String, content: String) -> bool; - fn get_config_keys(group: String) -> HashMap; + fn get_config_keys(&self, group: String) -> HashMap; } \ No newline at end of file diff --git a/config_center/src/dynamic_configuration_factory.rs b/config_center/src/dynamic_configuration_factory.rs index 91ee4cc3..d5d186b8 100644 --- a/config_center/src/dynamic_configuration_factory.rs +++ b/config_center/src/dynamic_configuration_factory.rs @@ -15,7 +15,7 @@ * limitations under the License. */ -use crate::URL::URL; +use crate::url::URL; pub trait DynamicConfigurationFactory { diff --git a/config_center/src/etcd_dynamic_configuration.rs b/config_center/src/etcd_dynamic_configuration.rs index d96301a5..baf8ed6c 100644 --- a/config_center/src/etcd_dynamic_configuration.rs +++ b/config_center/src/etcd_dynamic_configuration.rs @@ -15,14 +15,21 @@ * limitations under the License. */ -use crate::URL::URL; +use std::collections::HashMap; +use etcd_client::Client; +use crate::configuration_listener::ConfigurationListener; +use crate::dynamic_configuration::DynamicConfiguration; +use crate::url::URL; +use async_trait::async_trait; pub struct EtcdDynamicConfiguration { /** * The final root path would be: /$NAME_SPACE/config */ - pub rootPath: String, + pub root_path: String, + + pub client: Client, } const CONFIG_NAMESPACE_KEY: &str = "namespace"; @@ -31,12 +38,48 @@ const DEFAULT_GROUP: &str = "dubbo"; impl EtcdDynamicConfiguration { - pub fn new(&self, url: URL) -> Self { - let mut rootPath = String::from("/"); - rootPath.push_str(url.get_parameter(String::from(CONFIG_NAMESPACE_KEY), String::from(DEFAULT_GROUP)).as_str()); - rootPath.push_str("/config"); + pub async fn new(&self, url: URL) -> Self { + let client = Client::connect(["localhost:2379"], None).await.unwrap(); + let mut root_path = String::from("/"); + root_path.push_str(url.get_parameter(String::from(CONFIG_NAMESPACE_KEY), String::from(DEFAULT_GROUP)).as_str()); + root_path.push_str("/config"); EtcdDynamicConfiguration { - rootPath, + root_path, + client, + } + } +} + +#[async_trait] +impl DynamicConfiguration for EtcdDynamicConfiguration { + fn add_listener(&self, key: String, listener: impl ConfigurationListener) { + todo!() + } + + fn remove_listener(&self, key: String, listener: impl ConfigurationListener) { + todo!() + } + + async fn get_config(&mut self, key: String, group: String, timeout: i32) -> String { + if key.is_empty() { + return String::from(""); + } + let resp = self.client.get(key, None).await.unwrap(); + if let Some(kv) = resp.kvs().first() { + return kv.value_str().unwrap().to_string(); } + return String::from(""); + } + + fn get_properties(&self, key: String, group: String, timeout: i32) -> String { + todo!() + } + + fn publish_config(&self, key: String, group: String, content: String) -> bool { + todo!() + } + + fn get_config_keys(&self, group: String) -> HashMap { + todo!() } } \ No newline at end of file diff --git a/config_center/src/lib.rs b/config_center/src/lib.rs index 6151823b..bf39d8f5 100644 --- a/config_center/src/lib.rs +++ b/config_center/src/lib.rs @@ -19,7 +19,7 @@ mod dynamic_configuration_factory; mod configuration_listener; mod config_changed_event; mod dynamic_configuration; -mod URL; +mod url; mod etcd_dynamic_configuration; #[cfg(test)] diff --git a/config_center/src/URL.rs b/config_center/src/url.rs similarity index 88% rename from config_center/src/URL.rs rename to config_center/src/url.rs index cb685ce0..f2f3076e 100644 --- a/config_center/src/URL.rs +++ b/config_center/src/url.rs @@ -23,15 +23,15 @@ pub struct URL { impl URL { - pub fn get_parameter(&self, key: String, defaultValue: String) -> String { + pub fn get_parameter(&self, key: String, default_value: String) -> String { let value = match self.parameters.get(key.as_str()) { Some(value) => value.clone(), None => { - defaultValue.clone() + default_value.clone() }, }; if value.is_empty() { - return defaultValue; + return default_value; } value } From 2ae5d39cbe07d21ca4ebd42f8cccd0fc75438f65 Mon Sep 17 00:00:00 2001 From: Zonglei Dong Date: Mon, 18 Jul 2022 16:57:35 +0800 Subject: [PATCH 3/7] Feat(config_center): add etcd client watch listener feature. --- config_center/Cargo.toml | 1 + config_center/src/configuration_listener.rs | 4 +- config_center/src/dynamic_configuration.rs | 12 +- .../src/etcd_dynamic_configuration.rs | 167 ++++++++++++++++-- config_center/src/key.rs | 20 +++ config_center/src/lib.rs | 3 +- config_center/src/url.rs | 7 + 7 files changed, 188 insertions(+), 26 deletions(-) create mode 100644 config_center/src/key.rs diff --git a/config_center/Cargo.toml b/config_center/Cargo.toml index 3cb1cf2e..5e1f6569 100644 --- a/config_center/Cargo.toml +++ b/config_center/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" etcd-client = "0.9.2" tokio = { version = "1.0", features = ["full"] } async-trait = "0.1.56" +rand = "0.8.5" diff --git a/config_center/src/configuration_listener.rs b/config_center/src/configuration_listener.rs index 4c37fa38..43a5d035 100644 --- a/config_center/src/configuration_listener.rs +++ b/config_center/src/configuration_listener.rs @@ -19,5 +19,7 @@ use crate::config_changed_event::ConfigChangedEvent; pub trait ConfigurationListener { - fn process(event: ConfigChangedEvent); + fn process(&self, event: ConfigChangedEvent); + + fn get_type(&self) -> String; } \ No newline at end of file diff --git a/config_center/src/dynamic_configuration.rs b/config_center/src/dynamic_configuration.rs index 1702a843..3e057c26 100644 --- a/config_center/src/dynamic_configuration.rs +++ b/config_center/src/dynamic_configuration.rs @@ -15,24 +15,24 @@ * limitations under the License. */ -use std::collections::HashMap; +use std::collections::HashSet; use crate::configuration_listener::ConfigurationListener; use async_trait::async_trait; #[async_trait] pub trait DynamicConfiguration { - fn add_listener(&self, key: String, listener: impl ConfigurationListener); + async fn add_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send); - fn remove_listener(&self, key: String, listener: impl ConfigurationListener); + async fn remove_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send); // TODO how to override async fn get_config(&mut self, key: String, group: String, timeout: i32) -> String; - fn get_properties(&self, key: String, group: String, timeout: i32) -> String; + async fn get_properties(&mut self, key: String, group: String, timeout: i32) -> String; - fn publish_config(&self, key: String, group: String, content: String) -> bool; + async fn publish_config(&mut self, key: String, group: String, content: String) -> bool; - fn get_config_keys(&self, group: String) -> HashMap; + async fn get_config_keys(&mut self, group: String) -> HashSet; } \ No newline at end of file diff --git a/config_center/src/etcd_dynamic_configuration.rs b/config_center/src/etcd_dynamic_configuration.rs index baf8ed6c..5af7cc4a 100644 --- a/config_center/src/etcd_dynamic_configuration.rs +++ b/config_center/src/etcd_dynamic_configuration.rs @@ -15,12 +15,14 @@ * limitations under the License. */ -use std::collections::HashMap; -use etcd_client::Client; +use std::borrow::BorrowMut; +use std::collections::{HashMap, HashSet}; +use etcd_client::{Client, GetOptions, Watcher, WatchOptions, WatchStream}; use crate::configuration_listener::ConfigurationListener; use crate::dynamic_configuration::DynamicConfiguration; -use crate::url::URL; use async_trait::async_trait; +use rand::Rng; +use crate::url::URL; pub struct EtcdDynamicConfiguration { @@ -30,34 +32,106 @@ pub struct EtcdDynamicConfiguration { pub root_path: String, pub client: Client, + + pub url: URL, + + pub watcher: Watcher, + + pub stream: WatchStream, + + pub watch_listener_map: HashMap>, } const CONFIG_NAMESPACE_KEY: &str = "namespace"; const DEFAULT_GROUP: &str = "dubbo"; +const PATH_SEPARATOR: &str = "/"; + impl EtcdDynamicConfiguration { - pub async fn new(&self, url: URL) -> Self { - let client = Client::connect(["localhost:2379"], None).await.unwrap(); - let mut root_path = String::from("/"); - root_path.push_str(url.get_parameter(String::from(CONFIG_NAMESPACE_KEY), String::from(DEFAULT_GROUP)).as_str()); + pub async fn new(self, url: URL) -> Self { + let mut client = Client::connect(["localhost:2379"], None).await.unwrap(); + let mut root_path = String::from(PATH_SEPARATOR); + root_path.push_str(url.get_parameter(CONFIG_NAMESPACE_KEY.to_string(), DEFAULT_GROUP.to_string()).as_str()); root_path.push_str("/config"); + let (watcher, stream) = client.watch("/", None).await.unwrap(); + let watch_listener_map = HashMap::new(); + + + // while let Some(resp) = stream.message().await? { + // println!("[{:?}] receive watch response", resp.watch_id()); + // for event in resp.events() { + // println!("event type: {:?}", event.event_type()); + // if let Some(kv) = event.kv() { + // println!("kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?); + // } + // if EventType::Delete == event.event_type() { + // watcher.cancel_by_id(resp.watch_id()).await?; + // } + // } + // } + + + + EtcdDynamicConfiguration { root_path, client, + url, + watcher, + stream, + watch_listener_map, + } + } + + pub fn get_path(&self, key: String, group: String) -> String { + if key.len() == 0 { + return self.build_path(group); } + self.build_path(group.clone()) + PATH_SEPARATOR + key.as_str() + } + + pub fn build_path(&self, mut group: String) -> String { + if group.len() == 0 { + group = DEFAULT_GROUP.to_string(); + } + self.root_path.clone() + PATH_SEPARATOR + group.as_str() } } #[async_trait] impl DynamicConfiguration for EtcdDynamicConfiguration { - fn add_listener(&self, key: String, listener: impl ConfigurationListener) { - todo!() + + async fn add_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send) { + let path = key.clone() + group.as_str(); + let mut rng = rand::thread_rng(); + let watch_id: i64 = rng.gen(); + if !self.watch_listener_map.contains_key(path.as_str()) { + let mut watcher_map = HashMap::new(); + let listener_type = listener.get_type(); + let mut etcd_watcher = EtcdConfigWatcher::new(key.clone(), group, self.watcher, self.stream, watch_id, listener); + etcd_watcher.watch(watch_id); + watcher_map.insert(listener_type, etcd_watcher); + self.watch_listener_map.insert(path, watcher_map); + } else { + let watcher_map = self.watch_listener_map.get_mut(path.as_str()).unwrap(); + let listener_type = listener.get_type(); + let mut etcd_watcher = EtcdConfigWatcher::new(key.clone(), group, self.watcher, self.stream, watch_id, listener); + etcd_watcher.watch(watch_id); + watcher_map.insert(listener_type, etcd_watcher); + } } - fn remove_listener(&self, key: String, listener: impl ConfigurationListener) { - todo!() + async fn remove_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send) { + let path = key + group.as_str(); + let watcher_map = self.watch_listener_map.get_mut(path.as_str()).unwrap(); + if !watcher_map.contains_key(listener.get_type().as_str()) { + return; + } + let watcher = watcher_map.get_mut(listener.get_type().as_str()).unwrap(); + watcher.cancelWatch(); + watcher_map.remove(listener.get_type().as_str()); } async fn get_config(&mut self, key: String, group: String, timeout: i32) -> String { @@ -71,15 +145,72 @@ impl DynamicConfiguration for EtcdDynamicConfiguration { return String::from(""); } - fn get_properties(&self, key: String, group: String, timeout: i32) -> String { - todo!() + async fn get_properties(&mut self, key: String, group: String, timeout: i32) -> String { + let mut path = String::new(); + if group.len() != 0 { + path = group + PATH_SEPARATOR + key.as_str(); + } else { + path = self.url.get_parameter(CONFIG_NAMESPACE_KEY.to_string(), DEFAULT_GROUP.to_string()) + PATH_SEPARATOR + key.as_str(); + } + let resp = self.client.get(key, None).await.unwrap(); + if let Some(kv) = resp.kvs().first() { + return kv.value_str().unwrap().to_string(); + } + return String::from(""); + } + + async fn publish_config(&mut self, key: String, group: String, content: String) -> bool { + let path = self.get_path(key, group); + + // TODO need base64 encoding + + self.client.put(path, content, None).await.unwrap(); + + // TODO consider fix return value type. + true } - fn publish_config(&self, key: String, group: String, content: String) -> bool { - todo!() + async fn get_config_keys(&mut self, group: String) -> HashSet { + let path = self.get_path("".to_string(), group); + let resp = self.client.get("", Some(GetOptions::new().with_prefix())).await.unwrap(); + let mut result = HashSet::new(); + for kv in resp.kvs() { + result.insert(kv.key_str().unwrap().to_string()); + } + result } +} + +pub struct EtcdConfigWatcher { + //pub listener: Box, + pub key: String, + pub group: String, + pub normalized_key: String, + pub watcher: Watcher, + pub stream: WatchStream, + pub watch_id: i64, +} + +impl EtcdConfigWatcher { - fn get_config_keys(&self, group: String) -> HashMap { - todo!() + pub fn new(key: String, group: String, watcher: Watcher, stream: WatchStream, watch_id: i64, listener: impl ConfigurationListener) -> Self { + EtcdConfigWatcher { + //listener, + key, + group, + normalized_key: "".to_string(), // TODO + watcher, + stream, + watch_id, + } } -} \ No newline at end of file + + pub async fn watch(&mut self, watch_id: i64) { + self.watcher.watch(self.key.clone(), Some(WatchOptions::new().with_watch_id(watch_id))).await.unwrap(); + } + + pub fn cancelWatch(&mut self) { + let watch_id = self.watch_id; + self.watcher.cancel_by_id(watch_id); + } +} diff --git a/config_center/src/key.rs b/config_center/src/key.rs new file mode 100644 index 00000000..1fd20f7e --- /dev/null +++ b/config_center/src/key.rs @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const CONFIG_NAMESPACE_KEY: &str = "config-center.namespace"; + +const CONFIG_GROUP_KEY: &str = "config-center.group"; diff --git a/config_center/src/lib.rs b/config_center/src/lib.rs index bf39d8f5..df7f0a98 100644 --- a/config_center/src/lib.rs +++ b/config_center/src/lib.rs @@ -19,8 +19,9 @@ mod dynamic_configuration_factory; mod configuration_listener; mod config_changed_event; mod dynamic_configuration; -mod url; mod etcd_dynamic_configuration; +mod key; +mod url; #[cfg(test)] mod tests { diff --git a/config_center/src/url.rs b/config_center/src/url.rs index f2f3076e..5aa58d47 100644 --- a/config_center/src/url.rs +++ b/config_center/src/url.rs @@ -18,7 +18,14 @@ use std::collections::HashMap; pub struct URL { + pub protocol: String, + pub username: String, + pub password: String, + pub host: String, + pub port: u32, + pub path: String, parameters: HashMap, + methodParameters: HashMap>, } impl URL { From 929741c6b42410f32a390b783dbcea53fbbdb1a2 Mon Sep 17 00:00:00 2001 From: Zonglei Dong Date: Mon, 18 Jul 2022 17:06:14 +0800 Subject: [PATCH 4/7] Fix(config_center): format code. --- config_center/src/config_changed_event.rs | 2 +- config_center/src/configuration_listener.rs | 2 +- config_center/src/dynamic_configuration.rs | 2 +- config_center/src/dynamic_configuration_factory.rs | 2 +- config_center/src/url.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/config_center/src/config_changed_event.rs b/config_center/src/config_changed_event.rs index 1e6bb6b3..0f16b705 100644 --- a/config_center/src/config_changed_event.rs +++ b/config_center/src/config_changed_event.rs @@ -32,4 +32,4 @@ pub enum ConfigChangeType { ADDED, MODIFIED, DELETED, -} \ No newline at end of file +} diff --git a/config_center/src/configuration_listener.rs b/config_center/src/configuration_listener.rs index 43a5d035..65d352b4 100644 --- a/config_center/src/configuration_listener.rs +++ b/config_center/src/configuration_listener.rs @@ -22,4 +22,4 @@ pub trait ConfigurationListener { fn process(&self, event: ConfigChangedEvent); fn get_type(&self) -> String; -} \ No newline at end of file +} diff --git a/config_center/src/dynamic_configuration.rs b/config_center/src/dynamic_configuration.rs index 3e057c26..118364a8 100644 --- a/config_center/src/dynamic_configuration.rs +++ b/config_center/src/dynamic_configuration.rs @@ -35,4 +35,4 @@ pub trait DynamicConfiguration { async fn publish_config(&mut self, key: String, group: String, content: String) -> bool; async fn get_config_keys(&mut self, group: String) -> HashSet; -} \ No newline at end of file +} diff --git a/config_center/src/dynamic_configuration_factory.rs b/config_center/src/dynamic_configuration_factory.rs index d5d186b8..f53a8b7c 100644 --- a/config_center/src/dynamic_configuration_factory.rs +++ b/config_center/src/dynamic_configuration_factory.rs @@ -20,4 +20,4 @@ use crate::url::URL; pub trait DynamicConfigurationFactory { fn get_dynamic_configuration(&self, url: URL) -> T; -} \ No newline at end of file +} diff --git a/config_center/src/url.rs b/config_center/src/url.rs index 5aa58d47..334a906e 100644 --- a/config_center/src/url.rs +++ b/config_center/src/url.rs @@ -42,4 +42,4 @@ impl URL { } value } -} \ No newline at end of file +} From 6c1777e080cb062fd47034674b67c78ee217070a Mon Sep 17 00:00:00 2001 From: Zonglei Dong Date: Wed, 20 Jul 2022 11:48:18 +0800 Subject: [PATCH 5/7] Fix(config_center): format code. --- config_center/src/etcd_dynamic_configuration.rs | 4 ++-- config_center/src/url.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/config_center/src/etcd_dynamic_configuration.rs b/config_center/src/etcd_dynamic_configuration.rs index 5af7cc4a..7b938be0 100644 --- a/config_center/src/etcd_dynamic_configuration.rs +++ b/config_center/src/etcd_dynamic_configuration.rs @@ -53,7 +53,7 @@ impl EtcdDynamicConfiguration { pub async fn new(self, url: URL) -> Self { let mut client = Client::connect(["localhost:2379"], None).await.unwrap(); let mut root_path = String::from(PATH_SEPARATOR); - root_path.push_str(url.get_parameter(CONFIG_NAMESPACE_KEY.to_string(), DEFAULT_GROUP.to_string()).as_str()); + root_path.push_str(url.get_parameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP).as_str()); root_path.push_str("/config"); let (watcher, stream) = client.watch("/", None).await.unwrap(); let watch_listener_map = HashMap::new(); @@ -150,7 +150,7 @@ impl DynamicConfiguration for EtcdDynamicConfiguration { if group.len() != 0 { path = group + PATH_SEPARATOR + key.as_str(); } else { - path = self.url.get_parameter(CONFIG_NAMESPACE_KEY.to_string(), DEFAULT_GROUP.to_string()) + PATH_SEPARATOR + key.as_str(); + path = self.url.get_parameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + key.as_str(); } let resp = self.client.get(key, None).await.unwrap(); if let Some(kv) = resp.kvs().first() { diff --git a/config_center/src/url.rs b/config_center/src/url.rs index 334a906e..9525e837 100644 --- a/config_center/src/url.rs +++ b/config_center/src/url.rs @@ -30,16 +30,16 @@ pub struct URL { impl URL { - pub fn get_parameter(&self, key: String, default_value: String) -> String { - let value = match self.parameters.get(key.as_str()) { - Some(value) => value.clone(), + pub fn get_parameter(&self, key: &str, default_value: &str) -> String { + let value = match self.parameters.get(key) { + Some(value) => value, None => { - default_value.clone() + default_value }, }; if value.is_empty() { - return default_value; + return String::from(default_value) } - value + String::from(value) } } From d7fd6c2f3131a37ca8cec446c4d1a6e33bc97b70 Mon Sep 17 00:00:00 2001 From: Zonglei Dong Date: Wed, 20 Jul 2022 12:02:48 +0800 Subject: [PATCH 6/7] Fix(config_center): format code. --- config_center/src/dynamic_configuration.rs | 12 ++--- .../src/etcd_dynamic_configuration.rs | 44 +++++++++---------- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/config_center/src/dynamic_configuration.rs b/config_center/src/dynamic_configuration.rs index 118364a8..188feb07 100644 --- a/config_center/src/dynamic_configuration.rs +++ b/config_center/src/dynamic_configuration.rs @@ -22,17 +22,17 @@ use async_trait::async_trait; #[async_trait] pub trait DynamicConfiguration { - async fn add_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send); + async fn add_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send); - async fn remove_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send); + async fn remove_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send); // TODO how to override - async fn get_config(&mut self, key: String, group: String, timeout: i32) -> String; + async fn get_config(&mut self, key: &str, group: &str, timeout: i32) -> String; - async fn get_properties(&mut self, key: String, group: String, timeout: i32) -> String; + async fn get_properties(&mut self, key: &str, group: &str, timeout: i32) -> String; - async fn publish_config(&mut self, key: String, group: String, content: String) -> bool; + async fn publish_config(&mut self, key: &str, group: &str, content: &str) -> bool; - async fn get_config_keys(&mut self, group: String) -> HashSet; + async fn get_config_keys(&mut self, group: &str) -> HashSet; } diff --git a/config_center/src/etcd_dynamic_configuration.rs b/config_center/src/etcd_dynamic_configuration.rs index 7b938be0..8336df0a 100644 --- a/config_center/src/etcd_dynamic_configuration.rs +++ b/config_center/src/etcd_dynamic_configuration.rs @@ -15,7 +15,6 @@ * limitations under the License. */ -use std::borrow::BorrowMut; use std::collections::{HashMap, HashSet}; use etcd_client::{Client, GetOptions, Watcher, WatchOptions, WatchStream}; use crate::configuration_listener::ConfigurationListener; @@ -85,46 +84,45 @@ impl EtcdDynamicConfiguration { } } - pub fn get_path(&self, key: String, group: String) -> String { - if key.len() == 0 { + pub fn get_path(&self, key: &str, group: &str) -> String { + if key.is_empty() { return self.build_path(group); } - self.build_path(group.clone()) + PATH_SEPARATOR + key.as_str() + self.build_path(group) + PATH_SEPARATOR + key } - pub fn build_path(&self, mut group: String) -> String { - if group.len() == 0 { - group = DEFAULT_GROUP.to_string(); + pub fn build_path(&self, mut group: &str) -> String { + if group.is_empty() { + group = DEFAULT_GROUP; } - self.root_path.clone() + PATH_SEPARATOR + group.as_str() + self.root_path.clone() + PATH_SEPARATOR + group } } #[async_trait] impl DynamicConfiguration for EtcdDynamicConfiguration { - async fn add_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send) { - let path = key.clone() + group.as_str(); - let mut rng = rand::thread_rng(); - let watch_id: i64 = rng.gen(); + async fn add_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send) { + let path = self.get_path(key, group); + let watch_id = rand::thread_rng().gen(); if !self.watch_listener_map.contains_key(path.as_str()) { let mut watcher_map = HashMap::new(); let listener_type = listener.get_type(); - let mut etcd_watcher = EtcdConfigWatcher::new(key.clone(), group, self.watcher, self.stream, watch_id, listener); + let mut etcd_watcher = EtcdConfigWatcher::new(key.to_string(), group.to_string(), self.watcher, self.stream, watch_id, listener); etcd_watcher.watch(watch_id); watcher_map.insert(listener_type, etcd_watcher); self.watch_listener_map.insert(path, watcher_map); } else { let watcher_map = self.watch_listener_map.get_mut(path.as_str()).unwrap(); let listener_type = listener.get_type(); - let mut etcd_watcher = EtcdConfigWatcher::new(key.clone(), group, self.watcher, self.stream, watch_id, listener); + let mut etcd_watcher = EtcdConfigWatcher::new(key.to_string(), group.to_string(), self.watcher, self.stream, watch_id, listener); etcd_watcher.watch(watch_id); watcher_map.insert(listener_type, etcd_watcher); } } - async fn remove_listener(mut self, key: String, group: String, listener: impl ConfigurationListener + std::marker::Send) { - let path = key + group.as_str(); + async fn remove_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send) { + let path = self.get_path(key, group); let watcher_map = self.watch_listener_map.get_mut(path.as_str()).unwrap(); if !watcher_map.contains_key(listener.get_type().as_str()) { return; @@ -134,7 +132,7 @@ impl DynamicConfiguration for EtcdDynamicConfiguration { watcher_map.remove(listener.get_type().as_str()); } - async fn get_config(&mut self, key: String, group: String, timeout: i32) -> String { + async fn get_config(&mut self, key: &str, group: &str, timeout: i32) -> String { if key.is_empty() { return String::from(""); } @@ -145,12 +143,12 @@ impl DynamicConfiguration for EtcdDynamicConfiguration { return String::from(""); } - async fn get_properties(&mut self, key: String, group: String, timeout: i32) -> String { + async fn get_properties(&mut self, key: &str, group: &str, timeout: i32) -> String { let mut path = String::new(); if group.len() != 0 { - path = group + PATH_SEPARATOR + key.as_str(); + path = group.to_string() + PATH_SEPARATOR + key; } else { - path = self.url.get_parameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + key.as_str(); + path = self.url.get_parameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + key; } let resp = self.client.get(key, None).await.unwrap(); if let Some(kv) = resp.kvs().first() { @@ -159,7 +157,7 @@ impl DynamicConfiguration for EtcdDynamicConfiguration { return String::from(""); } - async fn publish_config(&mut self, key: String, group: String, content: String) -> bool { + async fn publish_config(&mut self, key: &str, group: &str, content: &str) -> bool { let path = self.get_path(key, group); // TODO need base64 encoding @@ -170,8 +168,8 @@ impl DynamicConfiguration for EtcdDynamicConfiguration { true } - async fn get_config_keys(&mut self, group: String) -> HashSet { - let path = self.get_path("".to_string(), group); + async fn get_config_keys(&mut self, group: &str) -> HashSet { + let path = self.get_path("", group); let resp = self.client.get("", Some(GetOptions::new().with_prefix())).await.unwrap(); let mut result = HashSet::new(); for kv in resp.kvs() { From 286831273fd107b8337db72a63e3e25423c0b921 Mon Sep 17 00:00:00 2001 From: Zonglei Dong Date: Thu, 18 Aug 2022 20:36:30 +0800 Subject: [PATCH 7/7] refactor(config): refactor config. --- config_center/src/etcd_dynamic_configuration.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/config_center/src/etcd_dynamic_configuration.rs b/config_center/src/etcd_dynamic_configuration.rs index 8336df0a..ce668e89 100644 --- a/config_center/src/etcd_dynamic_configuration.rs +++ b/config_center/src/etcd_dynamic_configuration.rs @@ -133,14 +133,12 @@ impl DynamicConfiguration for EtcdDynamicConfiguration { } async fn get_config(&mut self, key: &str, group: &str, timeout: i32) -> String { - if key.is_empty() { - return String::from(""); - } - let resp = self.client.get(key, None).await.unwrap(); + let path = self.get_path(key, group); + let resp = self.client.get(path, None).await.unwrap(); if let Some(kv) = resp.kvs().first() { return kv.value_str().unwrap().to_string(); } - return String::from(""); + return String::new(); } async fn get_properties(&mut self, key: &str, group: &str, timeout: i32) -> String {