From 22a1ce3a63d88bea354855a045be84ef7d8eab66 Mon Sep 17 00:00:00 2001 From: Oana Barbu Date: Tue, 14 Sep 2021 16:42:12 +0300 Subject: [PATCH 1/6] Subscription does not send any updates --- rpc/Cargo.toml | 4 +++ rpc/src/geode.rs | 93 ++++++++++++++++++++++++++++++++++++++++++++++-- rpc/src/lib.rs | 3 +- 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 9030223..b81371e 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -39,6 +39,10 @@ hex = '0.4.3' sp-std = '3.0.0' frame-system-rpc-runtime-api = { default-features = false, version = '3.0.0' } sp-core = { features = ["full_crypto"], version = '3.0.0' } +log = "0.4.8" +parking_lot = "0.11.1" +sp-utils = '3.0.0' +futures = "0.3.9" # local dependencies automata-primitives = { path = "../primitives" } diff --git a/rpc/src/geode.rs b/rpc/src/geode.rs index ccd8a79..ac03fd1 100644 --- a/rpc/src/geode.rs +++ b/rpc/src/geode.rs @@ -9,6 +9,19 @@ use sp_runtime::{traits::Block as BlockT, RuntimeDebug}; use sp_std::{collections::btree_map::BTreeMap, prelude::*}; use std::sync::Arc; +use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; +use log::warn; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use parking_lot::Mutex; +use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt}; +use jsonrpc_core::futures::{ + sink::Sink as Sink01, + stream::Stream as Stream01, + future::Future as Future01, + future::Executor as Executor01, +}; +use sp_runtime::print; + // #[cfg(feature = "std")] use serde::{Deserialize, Serialize}; @@ -17,6 +30,9 @@ const RUNTIME_ERROR: i64 = 1; #[rpc] /// Geode RPC methods pub trait GeodeServer { + /// RPC Metadata + type Metadata; + /// return the registered geode list #[rpc(name = "registered_geodes")] fn registered_geodes(&self) -> Result>>; @@ -29,6 +45,22 @@ pub trait GeodeServer { /// Return the current state of a geode #[rpc(name = "geode_state")] fn geode_state(&self, geode: [u8; 32]) -> Result>; + + /// Geode state subscription + #[pubsub( + subscription = "geode_state", + subscribe, + name = "geode_subscribeState", + )] + fn subscribe_geode_state(&self, _: Self::Metadata, _: Subscriber); + + /// Unsubscribe from geode state subscription. + #[pubsub( + subscription = "geode_state", + unsubscribe, + name = "geode_unsubscribeState" + )] + fn unsubscribe_geode_state(&self, _: Option, _: SubscriptionId) -> Result; } /// The geode struct shows its status @@ -71,13 +103,30 @@ impl From> for WrappedGeode { /// An implementation of geode specific RPC methods. pub struct GeodeApi { client: Arc, + subscribers: SharedGeodeStateSenders, + manager: SubscriptionManager, } +type GeodeStateStream = TracingUnboundedReceiver; +type SharedGeodeStateSenders = Arc>>; +type GeodeStateSender = TracingUnboundedSender; + impl GeodeApi { /// Create new `Geode` with the given reference to the client. - pub fn new(client: Arc) -> Self { - GeodeApi { client } + pub fn new(client: Arc, manager: SubscriptionManager,) -> Self { + GeodeApi { + client, + subscribers: Arc::new(Mutex::new(vec![])), + manager, + } } + + /// Subscribe to a channel through which updates are sent + pub fn subscribe(&self) -> GeodeStateStream { + let (sender, receiver) = tracing_unbounded("mpsc_geode_state_notification_stream"); + self.subscribers.lock().push(sender); + receiver + } } impl GeodeServer<::Hash> for GeodeApi @@ -86,6 +135,8 @@ where C: ProvideRuntimeApi + HeaderBackend, C::Api: GeodeRuntimeApi, { + type Metadata = sc_rpc::Metadata; + /// get registered geode list fn registered_geodes(&self) -> Result>> { let api = self.client.runtime_api(); @@ -154,4 +205,42 @@ where Ok(geode_state) } + + fn subscribe_geode_state( + &self, + _metadata: Self::Metadata, + subscriber: Subscriber, + ) { + print("subscribe_geode_state called"); + let stream = self.subscribe() + .map(|x| Ok::<_,()>(String::from(x))) + .map_err(|e| warn!("Notification stream error: {:?}", e)) + .compat(); + + self.manager.add(subscriber, |sink| { + let stream = stream.map(|res| Ok(res)); + sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(stream) + .map(|_| ()) + }); + // let stream = self + // // .justification_stream + // .subscribe_custom() + // .map(|x| Ok(Ok::<_, jsonrpc_core::Error>(String::from(x)))); + + // self.manager.add(subscriber, |sink| { + // stream + // .forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))) + // .map(|_| ()) + // }); + } + + fn unsubscribe_geode_state( + &self, + _metadata: Option, + id: SubscriptionId, + ) -> Result { + print("unsubscribe_geode_state called"); + Ok(self.manager.cancel(id)) + } } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 74b2e86..135db49 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -226,7 +226,7 @@ where network, SubscriptionManager::::with_id_provider( HexEncodedIdProvider::default(), - Arc::new(subscription_task_executor), + Arc::new(subscription_task_executor.clone()), ), ))); @@ -236,6 +236,7 @@ where io.extend_with(GeodeServer::to_delegate(geode::GeodeApi::new( client.clone(), + SubscriptionManager::new(Arc::new(subscription_task_executor)), ))); io.extend_with(TransferServer::to_delegate(transfer::TransferApi::new( From 833795edf040d2c577a552a556cba422330624f0 Mon Sep 17 00:00:00 2001 From: Oana Barbu Date: Tue, 14 Sep 2021 22:10:58 +0300 Subject: [PATCH 2/6] Subscribers receive initial geode state; logic of the subscription implemented --- rpc/src/geode.rs | 122 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 94 insertions(+), 28 deletions(-) diff --git a/rpc/src/geode.rs b/rpc/src/geode.rs index ac03fd1..58267f2 100644 --- a/rpc/src/geode.rs +++ b/rpc/src/geode.rs @@ -6,7 +6,7 @@ use pallet_geode::{Geode, GeodeState}; use sc_light::blockchain::BlockchainHeaderBackend as HeaderBackend; use sp_api::ProvideRuntimeApi; use sp_runtime::{traits::Block as BlockT, RuntimeDebug}; -use sp_std::{collections::btree_map::BTreeMap, prelude::*}; +use sp_std::{collections::{btree_map::BTreeMap, btree_set::BTreeSet}, prelude::*}; use std::sync::Arc; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; @@ -15,6 +15,7 @@ use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use parking_lot::Mutex; use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt}; use jsonrpc_core::futures::{ + stream, sink::Sink as Sink01, stream::Stream as Stream01, future::Future as Future01, @@ -27,6 +28,75 @@ use serde::{Deserialize, Serialize}; const RUNTIME_ERROR: i64 = 1; +type GeodeId = [u8; 32]; +type SubscriberId = u64; + +pub struct GeodeStateNotifications { + next_id: SubscriberId, + listeners: BTreeMap>, + sinks: BTreeMap, +} + +impl GeodeStateNotifications { + pub fn new() -> Self { + GeodeStateNotifications { + next_id: 0, + listeners: BTreeMap::new(), + sinks: BTreeMap::new(), + } + } + + // Start subscribtion for changes in state of a particular geode + pub fn subscribe(&mut self, geode_id: GeodeId) -> GeodeStateStream { + self.next_id += 1; + let subscriber_id = self.next_id; + + match self.listeners.get_mut(&geode_id) { + Some(subscribers) => { + subscribers.insert(subscriber_id); + }, + None => { + let mut subscribers = BTreeSet::new(); + subscribers.insert(subscriber_id); + self.listeners.insert(geode_id, subscribers); + } + }; + + // insert sink + let (tx, rx) = tracing_unbounded("mpsc_geode_state_notification_items"); + self.sinks.insert(subscriber_id, tx); + + + // if let Some(m) = self.metrics.as_ref() { + // m.with_label_values(&[&"added"]).inc(); + // } + + rx + } + + // Trigger notification to all listeners + pub fn trigger(&mut self, id: GeodeId, new_state: GeodeState) { + // get the subscribers interested in the geode whose state is changing + let subscribers = match self.listeners.get_mut(&id) { + Some(subscribers) => subscribers, + None => return, + }; + let mut to_remove = vec!(); + for sub_id in subscribers.iter() { + let sink = self.sinks.get_mut(&sub_id).unwrap(); + match sink.unbounded_send(new_state.clone()) { + Ok(_) => (), + Err(_) => to_remove.push(sub_id.clone()), + }; + } + + for id in to_remove { + subscribers.remove(&id); + self.sinks.remove(&id); + } + } +} + #[rpc] /// Geode RPC methods pub trait GeodeServer { @@ -52,7 +122,7 @@ pub trait GeodeServer { subscribe, name = "geode_subscribeState", )] - fn subscribe_geode_state(&self, _: Self::Metadata, _: Subscriber); + fn subscribe_geode_state(&self, _: Self::Metadata, _: Subscriber, id: GeodeId); /// Unsubscribe from geode state subscription. #[pubsub( @@ -103,30 +173,22 @@ impl From> for WrappedGeode { /// An implementation of geode specific RPC methods. pub struct GeodeApi { client: Arc, - subscribers: SharedGeodeStateSenders, + notifications: Mutex, manager: SubscriptionManager, } -type GeodeStateStream = TracingUnboundedReceiver; -type SharedGeodeStateSenders = Arc>>; -type GeodeStateSender = TracingUnboundedSender; +type GeodeStateStream = TracingUnboundedReceiver; +type GeodeStateSender = TracingUnboundedSender; impl GeodeApi { /// Create new `Geode` with the given reference to the client. pub fn new(client: Arc, manager: SubscriptionManager,) -> Self { GeodeApi { client, - subscribers: Arc::new(Mutex::new(vec![])), + notifications: Mutex::new(GeodeStateNotifications::new()), manager, } } - - /// Subscribe to a channel through which updates are sent - pub fn subscribe(&self) -> GeodeStateStream { - let (sender, receiver) = tracing_unbounded("mpsc_geode_state_notification_stream"); - self.subscribers.lock().push(sender); - receiver - } } impl GeodeServer<::Hash> for GeodeApi @@ -209,30 +271,34 @@ where fn subscribe_geode_state( &self, _metadata: Self::Metadata, - subscriber: Subscriber, + subscriber: Subscriber, + id: GeodeId, ) { print("subscribe_geode_state called"); - let stream = self.subscribe() - .map(|x| Ok::<_,()>(String::from(x))) + let initial = match self.geode_state(id.clone()) { + Ok(state) => match state { + Some(initial) => Ok(initial), + None => { + Ok(GeodeState::Registered) + // let _ = subscriber.reject(Error::invalid_params("no such geode")); + // return + }, + } + Err(e) => Err(e), + }; + + let stream = self.notifications.lock().subscribe(id) + .map(|x| Ok::<_,()>(GeodeState::from(x))) .map_err(|e| warn!("Notification stream error: {:?}", e)) .compat(); self.manager.add(subscriber, |sink| { let stream = stream.map(|res| Ok(res)); sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(stream) + .send_all(stream::iter_result(vec![Ok(initial)]) + .chain(stream)) .map(|_| ()) }); - // let stream = self - // // .justification_stream - // .subscribe_custom() - // .map(|x| Ok(Ok::<_, jsonrpc_core::Error>(String::from(x)))); - - // self.manager.add(subscriber, |sink| { - // stream - // .forward(sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))) - // .map(|_| ()) - // }); } fn unsubscribe_geode_state( From e4820d4e7abec30799f97fb86cc89fd79b452541 Mon Sep 17 00:00:00 2001 From: Oana Barbu Date: Tue, 21 Sep 2021 20:29:41 +0300 Subject: [PATCH 3/6] Implement geode state subscription on top of state_subscribeStorage --- rpc/Cargo.toml | 3 +- rpc/src/geode.rs | 161 +++++++++++++++++++++-------------------------- 2 files changed, 72 insertions(+), 92 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index b81371e..98423ec 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -40,9 +40,8 @@ sp-std = '3.0.0' frame-system-rpc-runtime-api = { default-features = false, version = '3.0.0' } sp-core = { features = ["full_crypto"], version = '3.0.0' } log = "0.4.8" -parking_lot = "0.11.1" -sp-utils = '3.0.0' futures = "0.3.9" +codec = { default-features = false, features = ['derive'], package = 'parity-scale-codec', version = '2.0.0' } # local dependencies automata-primitives = { path = "../primitives" } diff --git a/rpc/src/geode.rs b/rpc/src/geode.rs index 58267f2..e85b64c 100644 --- a/rpc/src/geode.rs +++ b/rpc/src/geode.rs @@ -6,96 +6,30 @@ use pallet_geode::{Geode, GeodeState}; use sc_light::blockchain::BlockchainHeaderBackend as HeaderBackend; use sp_api::ProvideRuntimeApi; use sp_runtime::{traits::Block as BlockT, RuntimeDebug}; -use sp_std::{collections::{btree_map::BTreeMap, btree_set::BTreeSet}, prelude::*}; +use sp_std::{collections::btree_map::BTreeMap, prelude::*}; use std::sync::Arc; +use sp_core::{twox_128, blake2_128}; +use codec::{Encode, Decode}; +use sc_client_api::BlockchainEvents; +use sp_core::storage::{StorageKey}; +use sc_client_api::StorageChangeSet; + use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; use log::warn; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use parking_lot::Mutex; -use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt}; +use futures::{TryStreamExt, StreamExt}; use jsonrpc_core::futures::{ stream, sink::Sink as Sink01, stream::Stream as Stream01, future::Future as Future01, - future::Executor as Executor01, }; -use sp_runtime::print; -// #[cfg(feature = "std")] use serde::{Deserialize, Serialize}; const RUNTIME_ERROR: i64 = 1; type GeodeId = [u8; 32]; -type SubscriberId = u64; - -pub struct GeodeStateNotifications { - next_id: SubscriberId, - listeners: BTreeMap>, - sinks: BTreeMap, -} - -impl GeodeStateNotifications { - pub fn new() -> Self { - GeodeStateNotifications { - next_id: 0, - listeners: BTreeMap::new(), - sinks: BTreeMap::new(), - } - } - - // Start subscribtion for changes in state of a particular geode - pub fn subscribe(&mut self, geode_id: GeodeId) -> GeodeStateStream { - self.next_id += 1; - let subscriber_id = self.next_id; - - match self.listeners.get_mut(&geode_id) { - Some(subscribers) => { - subscribers.insert(subscriber_id); - }, - None => { - let mut subscribers = BTreeSet::new(); - subscribers.insert(subscriber_id); - self.listeners.insert(geode_id, subscribers); - } - }; - - // insert sink - let (tx, rx) = tracing_unbounded("mpsc_geode_state_notification_items"); - self.sinks.insert(subscriber_id, tx); - - - // if let Some(m) = self.metrics.as_ref() { - // m.with_label_values(&[&"added"]).inc(); - // } - - rx - } - - // Trigger notification to all listeners - pub fn trigger(&mut self, id: GeodeId, new_state: GeodeState) { - // get the subscribers interested in the geode whose state is changing - let subscribers = match self.listeners.get_mut(&id) { - Some(subscribers) => subscribers, - None => return, - }; - let mut to_remove = vec!(); - for sub_id in subscribers.iter() { - let sink = self.sinks.get_mut(&sub_id).unwrap(); - match sink.unbounded_send(new_state.clone()) { - Ok(_) => (), - Err(_) => to_remove.push(sub_id.clone()), - }; - } - - for id in to_remove { - subscribers.remove(&id); - self.sinks.remove(&id); - } - } -} #[rpc] /// Geode RPC methods @@ -173,19 +107,14 @@ impl From> for WrappedGeode { /// An implementation of geode specific RPC methods. pub struct GeodeApi { client: Arc, - notifications: Mutex, manager: SubscriptionManager, } -type GeodeStateStream = TracingUnboundedReceiver; -type GeodeStateSender = TracingUnboundedSender; - impl GeodeApi { /// Create new `Geode` with the given reference to the client. pub fn new(client: Arc, manager: SubscriptionManager,) -> Self { GeodeApi { client, - notifications: Mutex::new(GeodeStateNotifications::new()), manager, } } @@ -194,7 +123,7 @@ impl GeodeApi { impl GeodeServer<::Hash> for GeodeApi where C: Send + Sync + 'static, - C: ProvideRuntimeApi + HeaderBackend, + C: ProvideRuntimeApi + HeaderBackend + BlockchainEvents, C::Api: GeodeRuntimeApi, { type Metadata = sc_rpc::Metadata; @@ -274,23 +203,34 @@ where subscriber: Subscriber, id: GeodeId, ) { - print("subscribe_geode_state called"); + // get the current state of the geode + // if the geode does not exist, reject the subscription let initial = match self.geode_state(id.clone()) { Ok(state) => match state { Some(initial) => Ok(initial), None => { - Ok(GeodeState::Registered) - // let _ = subscriber.reject(Error::invalid_params("no such geode")); - // return + let _ = subscriber.reject(Error::invalid_params("no such geode")); + return }, } Err(e) => Err(e), }; - - let stream = self.notifications.lock().subscribe(id) - .map(|x| Ok::<_,()>(GeodeState::from(x))) - .map_err(|e| warn!("Notification stream error: {:?}", e)) - .compat(); + let key: StorageKey = StorageKey(build_storage_key(id.clone())); + let keys = Into::>>::into(vec!(key)); + let stream = match self.client.storage_changes_notification_stream( + keys.as_ref().map(|x| &**x), + None + ) { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(client_err(err).into()); + return; + }, + }; + + let stream = stream + .map(|(_block, changes)| Ok::<_, ()>(get_geode_state(changes))) + .compat(); self.manager.add(subscriber, |sink| { let stream = stream.map(|res| Ok(res)); @@ -306,7 +246,48 @@ where _metadata: Option, id: SubscriptionId, ) -> Result { - print("unsubscribe_geode_state called"); Ok(self.manager.cancel(id)) } } + +fn build_storage_key(id: GeodeId) -> Vec { + let geode_module = twox_128(b"GeodeModule"); + let geodes = twox_128(b"Geodes"); + let geode: AccountId = id.into(); + let geode = blake2_128_concat(&geode.encode()); + + let mut param = vec!(); + param.extend(geode_module); + param.extend(geodes); + param.extend(geode); + param +} + +fn blake2_128_concat(d: &[u8]) -> Vec { + let mut v = blake2_128(d).to_vec(); + v.extend_from_slice(d); + v +} + +fn get_geode_state(changes: StorageChangeSet) -> GeodeState { + for (_, _, data) in changes.iter() { + match data { + Some(data) => { + let mut value: &[u8] = &data.0.clone(); + match GeodeState::decode(&mut value) { + Ok(state) => { + return state; + }, + Err(_) => warn!("cannot decode GeodeState") + } + }, + None => warn!("data was none"), + }; + } + GeodeState::Null +} + +fn client_err(_: sp_blockchain::Error) -> Error { + Error::invalid_request() + // Client(Box::new(err)) +} From f6d74e9231fc93fb976141ff8f6120e1e3654ada Mon Sep 17 00:00:00 2001 From: Oana Barbu Date: Wed, 22 Sep 2021 10:53:18 +0300 Subject: [PATCH 4/6] Geode subscriptions are handled by a dedicated executor --- Cargo.lock | 3 ++ node/src/service.rs | 7 ++++- rpc/src/geode.rs | 77 ++++++++++++++++++++++----------------------- rpc/src/lib.rs | 5 +-- 4 files changed, 50 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db7784c..4271df1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,15 +473,18 @@ dependencies = [ "fc-rpc-core", "fp-rpc", "frame-system-rpc-runtime-api", + "futures 0.3.16", "hex", "jsonrpc-core 15.1.0", "jsonrpc-core-client 15.1.0", "jsonrpc-derive 15.1.0", "jsonrpc-pubsub 15.1.0", + "log", "pallet-ethereum", "pallet-geode", "pallet-transaction-payment-rpc", "pallet-transfer", + "parity-scale-codec", "sc-client-api", "sc-consensus-babe", "sc-consensus-babe-rpc", diff --git a/node/src/service.rs b/node/src/service.rs index c2e364d..a2cf8a4 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -235,6 +235,7 @@ pub fn new_full(mut config: Configuration) -> Result let is_authority = role.is_authority(); let subscription_task_executor = sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle()); + let geode_subscription_executor = sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle()); let babe_config = babe_link.config().clone(); let shared_epoch_changes = babe_link.epoch_changes().clone(); let justification_stream = grandpa_link.justification_stream(); @@ -277,7 +278,11 @@ pub fn new_full(mut config: Configuration) -> Result }, }; - automata_rpc::create_full(deps, subscription_task_executor.clone()) + automata_rpc::create_full( + deps, + subscription_task_executor.clone(), + geode_subscription_executor.clone(), + ) }) }; diff --git a/rpc/src/geode.rs b/rpc/src/geode.rs index e85b64c..49b8ebe 100644 --- a/rpc/src/geode.rs +++ b/rpc/src/geode.rs @@ -20,9 +20,9 @@ use log::warn; use futures::{TryStreamExt, StreamExt}; use jsonrpc_core::futures::{ stream, - sink::Sink as Sink01, - stream::Stream as Stream01, - future::Future as Future01, + sink::Sink as Sink01, + stream::Stream as Stream01, + future::Future as Future01, }; use serde::{Deserialize, Serialize}; @@ -35,7 +35,7 @@ type GeodeId = [u8; 32]; /// Geode RPC methods pub trait GeodeServer { /// RPC Metadata - type Metadata; + type Metadata; /// return the registered geode list #[rpc(name = "registered_geodes")] @@ -198,11 +198,11 @@ where } fn subscribe_geode_state( - &self, - _metadata: Self::Metadata, - subscriber: Subscriber, + &self, + _metadata: Self::Metadata, + subscriber: Subscriber, id: GeodeId, - ) { + ) { // get the current state of the geode // if the geode does not exist, reject the subscription let initial = match self.geode_state(id.clone()) { @@ -210,44 +210,44 @@ where Some(initial) => Ok(initial), None => { let _ = subscriber.reject(Error::invalid_params("no such geode")); - return + return }, } Err(e) => Err(e), }; let key: StorageKey = StorageKey(build_storage_key(id.clone())); let keys = Into::>>::into(vec!(key)); - let stream = match self.client.storage_changes_notification_stream( - keys.as_ref().map(|x| &**x), - None - ) { - Ok(stream) => stream, - Err(err) => { - let _ = subscriber.reject(client_err(err).into()); - return; - }, - }; + let stream = match self.client.storage_changes_notification_stream( + keys.as_ref().map(|x| &**x), + None + ) { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(client_err(err).into()); + return; + }, + }; let stream = stream .map(|(_block, changes)| Ok::<_, ()>(get_geode_state(changes))) .compat(); self.manager.add(subscriber, |sink| { - let stream = stream.map(|res| Ok(res)); - sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(stream::iter_result(vec![Ok(initial)]) + let stream = stream.map(|res| Ok(res)); + sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(stream::iter_result(vec![Ok(initial)]) .chain(stream)) - .map(|_| ()) - }); - } + .map(|_| ()) + }); + } - fn unsubscribe_geode_state( - &self, - _metadata: Option, - id: SubscriptionId, - ) -> Result { - Ok(self.manager.cancel(id)) - } + fn unsubscribe_geode_state( + &self, + _metadata: Option, + id: SubscriptionId, + ) -> Result { + Ok(self.manager.cancel(id)) + } } fn build_storage_key(id: GeodeId) -> Vec { @@ -264,9 +264,9 @@ fn build_storage_key(id: GeodeId) -> Vec { } fn blake2_128_concat(d: &[u8]) -> Vec { - let mut v = blake2_128(d).to_vec(); - v.extend_from_slice(d); - v + let mut v = blake2_128(d).to_vec(); + v.extend_from_slice(d); + v } fn get_geode_state(changes: StorageChangeSet) -> GeodeState { @@ -278,16 +278,15 @@ fn get_geode_state(changes: StorageChangeSet) -> GeodeState { Ok(state) => { return state; }, - Err(_) => warn!("cannot decode GeodeState") + Err(_) => warn!("unable to decode GeodeState") } }, - None => warn!("data was none"), + None => warn!("empty change set"), }; } GeodeState::Null } fn client_err(_: sp_blockchain::Error) -> Error { - Error::invalid_request() - // Client(Box::new(err)) + Error::invalid_request() } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 135db49..f881bf9 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -90,6 +90,7 @@ pub struct FullDeps { pub fn create_full( deps: FullDeps, subscription_task_executor: SubscriptionTaskExecutor, + geode_subscription_executor: SubscriptionTaskExecutor, ) -> jsonrpc_core::IoHandler where BE: Backend + 'static, @@ -226,7 +227,7 @@ where network, SubscriptionManager::::with_id_provider( HexEncodedIdProvider::default(), - Arc::new(subscription_task_executor.clone()), + Arc::new(subscription_task_executor), ), ))); @@ -236,7 +237,7 @@ where io.extend_with(GeodeServer::to_delegate(geode::GeodeApi::new( client.clone(), - SubscriptionManager::new(Arc::new(subscription_task_executor)), + SubscriptionManager::new(Arc::new(geode_subscription_executor)), ))); io.extend_with(TransferServer::to_delegate(transfer::TransferApi::new( From 2a6371bcd6fb2745d702bd3aafdef75a7fca9331 Mon Sep 17 00:00:00 2001 From: Oana Barbu Date: Wed, 22 Sep 2021 12:01:20 +0300 Subject: [PATCH 5/6] Fix formatting --- node/src/service.rs | 5 +-- rpc/src/geode.rs | 76 ++++++++++++++++++++------------------------- 2 files changed, 36 insertions(+), 45 deletions(-) diff --git a/node/src/service.rs b/node/src/service.rs index a2cf8a4..2c3b1ba 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -235,7 +235,8 @@ pub fn new_full(mut config: Configuration) -> Result let is_authority = role.is_authority(); let subscription_task_executor = sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle()); - let geode_subscription_executor = sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle()); + let geode_subscription_executor = + sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle()); let babe_config = babe_link.config().clone(); let shared_epoch_changes = babe_link.epoch_changes().clone(); let justification_stream = grandpa_link.justification_stream(); @@ -279,7 +280,7 @@ pub fn new_full(mut config: Configuration) -> Result }; automata_rpc::create_full( - deps, + deps, subscription_task_executor.clone(), geode_subscription_executor.clone(), ) diff --git a/rpc/src/geode.rs b/rpc/src/geode.rs index 49b8ebe..5fe3588 100644 --- a/rpc/src/geode.rs +++ b/rpc/src/geode.rs @@ -9,21 +9,18 @@ use sp_runtime::{traits::Block as BlockT, RuntimeDebug}; use sp_std::{collections::btree_map::BTreeMap, prelude::*}; use std::sync::Arc; -use sp_core::{twox_128, blake2_128}; -use codec::{Encode, Decode}; +use codec::{Decode, Encode}; use sc_client_api::BlockchainEvents; -use sp_core::storage::{StorageKey}; use sc_client_api::StorageChangeSet; +use sp_core::storage::StorageKey; +use sp_core::{blake2_128, twox_128}; -use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; -use log::warn; -use futures::{TryStreamExt, StreamExt}; +use futures::{StreamExt, TryStreamExt}; use jsonrpc_core::futures::{ - stream, - sink::Sink as Sink01, - stream::Stream as Stream01, - future::Future as Future01, + future::Future as Future01, sink::Sink as Sink01, stream, stream::Stream as Stream01, }; +use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId}; +use log::warn; use serde::{Deserialize, Serialize}; @@ -51,20 +48,17 @@ pub trait GeodeServer { fn geode_state(&self, geode: [u8; 32]) -> Result>; /// Geode state subscription + #[pubsub(subscription = "geode_state", subscribe, name = "geode_subscribeState")] + fn subscribe_geode_state(&self, _: Self::Metadata, _: Subscriber, id: GeodeId); + + /// Unsubscribe from geode state subscription. #[pubsub( subscription = "geode_state", - subscribe, - name = "geode_subscribeState", - )] - fn subscribe_geode_state(&self, _: Self::Metadata, _: Subscriber, id: GeodeId); - - /// Unsubscribe from geode state subscription. - #[pubsub( - subscription = "geode_state", - unsubscribe, - name = "geode_unsubscribeState" - )] - fn unsubscribe_geode_state(&self, _: Option, _: SubscriptionId) -> Result; + unsubscribe, + name = "geode_unsubscribeState" + )] + fn unsubscribe_geode_state(&self, _: Option, _: SubscriptionId) + -> Result; } /// The geode struct shows its status @@ -112,11 +106,8 @@ pub struct GeodeApi { impl GeodeApi { /// Create new `Geode` with the given reference to the client. - pub fn new(client: Arc, manager: SubscriptionManager,) -> Self { - GeodeApi { - client, - manager, - } + pub fn new(client: Arc, manager: SubscriptionManager) -> Self { + GeodeApi { client, manager } } } @@ -210,33 +201,32 @@ where Some(initial) => Ok(initial), None => { let _ = subscriber.reject(Error::invalid_params("no such geode")); - return - }, - } + return; + } + }, Err(e) => Err(e), }; let key: StorageKey = StorageKey(build_storage_key(id.clone())); - let keys = Into::>>::into(vec!(key)); - let stream = match self.client.storage_changes_notification_stream( - keys.as_ref().map(|x| &**x), - None - ) { + let keys = Into::>>::into(vec![key]); + let stream = match self + .client + .storage_changes_notification_stream(keys.as_ref().map(|x| &**x), None) + { Ok(stream) => stream, Err(err) => { let _ = subscriber.reject(client_err(err).into()); return; - }, + } }; let stream = stream .map(|(_block, changes)| Ok::<_, ()>(get_geode_state(changes))) - .compat(); + .compat(); self.manager.add(subscriber, |sink| { let stream = stream.map(|res| Ok(res)); sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(stream::iter_result(vec![Ok(initial)]) - .chain(stream)) + .send_all(stream::iter_result(vec![Ok(initial)]).chain(stream)) .map(|_| ()) }); } @@ -256,7 +246,7 @@ fn build_storage_key(id: GeodeId) -> Vec { let geode: AccountId = id.into(); let geode = blake2_128_concat(&geode.encode()); - let mut param = vec!(); + let mut param = vec![]; param.extend(geode_module); param.extend(geodes); param.extend(geode); @@ -277,10 +267,10 @@ fn get_geode_state(changes: StorageChangeSet) -> GeodeState { match GeodeState::decode(&mut value) { Ok(state) => { return state; - }, - Err(_) => warn!("unable to decode GeodeState") + } + Err(_) => warn!("unable to decode GeodeState"), } - }, + } None => warn!("empty change set"), }; } From 62474c79896555675b63d6c03e8d77955682bab8 Mon Sep 17 00:00:00 2001 From: Oana Barbu Date: Thu, 23 Sep 2021 21:13:28 +0300 Subject: [PATCH 6/6] Fix decoding of GeodeState --- rpc/src/geode.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/rpc/src/geode.rs b/rpc/src/geode.rs index 5fe3588..94d6abd 100644 --- a/rpc/src/geode.rs +++ b/rpc/src/geode.rs @@ -15,10 +15,8 @@ use sc_client_api::StorageChangeSet; use sp_core::storage::StorageKey; use sp_core::{blake2_128, twox_128}; -use futures::{StreamExt, TryStreamExt}; -use jsonrpc_core::futures::{ - future::Future as Future01, sink::Sink as Sink01, stream, stream::Stream as Stream01, -}; +use futures::{future, StreamExt, TryStreamExt}; +use jsonrpc_core::futures::{future::Future, sink::Sink, stream, stream::Stream}; use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId}; use log::warn; @@ -220,13 +218,16 @@ where }; let stream = stream - .map(|(_block, changes)| Ok::<_, ()>(get_geode_state(changes))) + .filter_map(move |(_block, changes)| match get_geode_state(changes) { + Ok(state) => future::ready(Some(Ok::<_, ()>(Ok(state)))), + Err(_) => future::ready(None), + }) .compat(); self.manager.add(subscriber, |sink| { - let stream = stream.map(|res| Ok(res)); sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) .send_all(stream::iter_result(vec![Ok(initial)]).chain(stream)) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) .map(|_| ()) }); } @@ -259,22 +260,22 @@ fn blake2_128_concat(d: &[u8]) -> Vec { v } -fn get_geode_state(changes: StorageChangeSet) -> GeodeState { +fn get_geode_state(changes: StorageChangeSet) -> Result { for (_, _, data) in changes.iter() { match data { Some(data) => { let mut value: &[u8] = &data.0.clone(); - match GeodeState::decode(&mut value) { - Ok(state) => { - return state; + match Geode::::decode(&mut value) { + Ok(geode) => { + return Ok(geode.state); } - Err(_) => warn!("unable to decode GeodeState"), + Err(_) => warn!("unable to decode Geode"), } } None => warn!("empty change set"), }; } - GeodeState::Null + Err(Error::internal_error()) } fn client_err(_: sp_blockchain::Error) -> Error {