diff --git a/Cargo.lock b/Cargo.lock index 20ed87326b36c..b0d53430bf557 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3548,6 +3548,7 @@ dependencies = [ "futures 0.3.5", "hex-literal", "jsonrpc-core", + "jsonrpc-pubsub", "log", "nix", "node-executor", @@ -3680,6 +3681,7 @@ name = "node-rpc" version = "2.0.0-rc5" dependencies = [ "jsonrpc-core", + "jsonrpc-pubsub", "node-primitives", "node-runtime", "pallet-contracts-rpc", @@ -3698,7 +3700,6 @@ dependencies = [ "sp-blockchain", "sp-consensus", "sp-consensus-babe", - "sp-runtime", "sp-transaction-pool", "substrate-frame-rpc-system", ] @@ -6622,11 +6623,23 @@ dependencies = [ "jsonrpc-core", "jsonrpc-core-client", "jsonrpc-derive", + "jsonrpc-pubsub", + "lazy_static", "log", + "parity-scale-codec", + "sc-block-builder", "sc-finality-grandpa", + "sc-network-test", + "sc-rpc", "serde", "serde_json", + "sp-blockchain", + "sp-consensus", "sp-core", + "sp-finality-grandpa", + "sp-keyring", + "sp-runtime", + "substrate-test-runtime-client", ] [[package]] diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 021d0ac8f7d1e..4eba4fdd093e9 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -113,7 +113,7 @@ pub fn new_full(config: Configuration) -> Result { let client = client.clone(); let pool = transaction_pool.clone(); - Box::new(move |deny_unsafe| { + Box::new(move |deny_unsafe, _| { let deps = crate::rpc::FullDeps { client: client.clone(), pool: pool.clone(), @@ -278,7 +278,7 @@ pub fn new_light(config: Configuration) -> Result { transaction_pool, task_manager: &mut task_manager, on_demand: Some(on_demand), - rpc_extensions_builder: Box::new(|_| ()), + rpc_extensions_builder: Box::new(|_, _| ()), telemetry_connection_sinks: sc_service::TelemetryConnectionSinks::default(), config, client, diff --git a/bin/node/cli/Cargo.toml b/bin/node/cli/Cargo.toml index 4fbb48513b3ce..2f0124482e22b 100644 --- a/bin/node/cli/Cargo.toml +++ b/bin/node/cli/Cargo.toml @@ -39,6 +39,7 @@ serde = { version = "1.0.102", features = ["derive"] } futures = { version = "0.3.1", features = ["compat"] } hex-literal = "0.2.1" jsonrpc-core = "14.2.0" +jsonrpc-pubsub = "14.2.0" log = "0.4.8" rand = "0.7.2" structopt = { version = "0.3.8", optional = true } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index fdfa6816296aa..c332b95a5183f 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -49,7 +49,10 @@ pub fn new_partial(config: &Configuration) -> Result, sc_transaction_pool::FullPool, ( - impl Fn(node_rpc::DenyUnsafe) -> node_rpc::IoHandler, + impl Fn( + node_rpc::DenyUnsafe, + jsonrpc_pubsub::manager::SubscriptionManager + ) -> node_rpc::IoHandler, ( sc_consensus_babe::BabeBlockImport, grandpa::LinkHalf, @@ -101,6 +104,7 @@ pub fn new_partial(config: &Configuration) -> Result Result Result { @@ -75,6 +76,10 @@ pub struct GrandpaDeps { pub shared_voter_state: SharedVoterState, /// Authority set info. pub shared_authority_set: SharedAuthoritySet, + /// Receives notifications about justification events from Grandpa. + pub justification_stream: GrandpaJustificationStream, + /// Subscription manager to keep track of pubsub subscribers. + pub subscriptions: SubscriptionManager, } /// Full client dependencies. @@ -97,9 +102,9 @@ pub struct FullDeps { pub type IoHandler = jsonrpc_core::IoHandler; /// Instantiate all Full RPC extensions. -pub fn create_full( +pub fn create_full( deps: FullDeps, -) -> jsonrpc_core::IoHandler where +) -> jsonrpc_core::IoHandler where C: ProvideRuntimeApi, C: HeaderBackend + HeaderMetadata + 'static, C: Send + Sync + 'static, @@ -109,7 +114,6 @@ pub fn create_full( C::Api: BabeApi, C::Api: BlockBuilder, P: TransactionPool + 'static, - M: jsonrpc_core::Metadata + Default, SC: SelectChain +'static, { use substrate_frame_rpc_system::{FullSystem, SystemApi}; @@ -125,6 +129,7 @@ pub fn create_full( babe, grandpa, } = deps; + let BabeDeps { keystore, babe_config, @@ -133,6 +138,8 @@ pub fn create_full( let GrandpaDeps { shared_voter_state, shared_authority_set, + justification_stream, + subscriptions, } = grandpa; io.extend_with( @@ -161,7 +168,12 @@ pub fn create_full( ); io.extend_with( sc_finality_grandpa_rpc::GrandpaApi::to_delegate( - GrandpaRpcHandler::new(shared_authority_set, shared_voter_state) + GrandpaRpcHandler::new( + shared_authority_set, + shared_voter_state, + justification_stream, + subscriptions, + ) ) ); diff --git a/client/finality-grandpa/rpc/Cargo.toml b/client/finality-grandpa/rpc/Cargo.toml index f8f567c02e78b..ca405eaec9dcb 100644 --- a/client/finality-grandpa/rpc/Cargo.toml +++ b/client/finality-grandpa/rpc/Cargo.toml @@ -8,16 +8,29 @@ edition = "2018" license = "GPL-3.0-or-later WITH Classpath-exception-2.0" [dependencies] +sc-rpc = { version = "2.0.0-rc5", path = "../../rpc" } +sp-runtime = { version = "2.0.0-rc5", path = "../../../primitives/runtime" } sc-finality-grandpa = { version = "0.8.0-rc5", path = "../" } finality-grandpa = { version = "0.12.3", features = ["derive-codec"] } jsonrpc-core = "14.2.0" jsonrpc-core-client = "14.2.0" jsonrpc-derive = "14.2.1" +jsonrpc-pubsub = "14.2.0" futures = { version = "0.3.4", features = ["compat"] } serde = { version = "1.0.105", features = ["derive"] } serde_json = "1.0.50" log = "0.4.8" derive_more = "0.99.2" +parity-scale-codec = { version = "1.3.0", features = ["derive"] } [dev-dependencies] +sc-block-builder = { version = "0.8.0-rc5", path = "../../block-builder" } +sc-network-test = { version = "0.8.0-rc5", path = "../../network/test" } +sc-rpc = { version = "2.0.0-rc5", path = "../../rpc", features = ["test-helpers"] } +sp-blockchain = { version = "2.0.0-rc5", path = "../../../primitives/blockchain" } +sp-consensus = { version = "0.8.0-rc5", path = "../../../primitives/consensus/common" } sp-core = { version = "2.0.0-rc5", path = "../../../primitives/core" } +sp-finality-grandpa = { version = "2.0.0-rc5", path = "../../../primitives/finality-grandpa" } +sp-keyring = { version = "2.0.0-rc5", path = "../../../primitives/keyring" } +substrate-test-runtime-client = { version = "2.0.0-rc5", path = "../../../test-utils/runtime/client" } +lazy_static = "1.4" diff --git a/client/finality-grandpa/rpc/src/lib.rs b/client/finality-grandpa/rpc/src/lib.rs index 1af84b7a84413..c00c95c5f776f 100644 --- a/client/finality-grandpa/rpc/src/lib.rs +++ b/client/finality-grandpa/rpc/src/lib.rs @@ -19,13 +19,25 @@ //! RPC API for GRANDPA. #![warn(missing_docs)] -use futures::{FutureExt, TryFutureExt}; +use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt}; +use log::warn; use jsonrpc_derive::rpc; +use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; +use jsonrpc_core::futures::{ + sink::Sink as Sink01, + stream::Stream as Stream01, + future::Future as Future01, +}; mod error; +mod notification; mod report; +use sc_finality_grandpa::GrandpaJustificationStream; +use sp_runtime::traits::Block as BlockT; + use report::{ReportAuthoritySet, ReportVoterState, ReportedRoundStates}; +use notification::JustificationNotification; /// Returned when Grandpa RPC endpoint is not ready. pub const NOT_READY_ERROR_CODE: i64 = 1; @@ -35,48 +47,128 @@ type FutureResult = /// Provides RPC methods for interacting with GRANDPA. #[rpc] -pub trait GrandpaApi { +pub trait GrandpaApi { + /// RPC Metadata + type Metadata; + /// Returns the state of the current best round state as well as the /// ongoing background rounds. #[rpc(name = "grandpa_roundState")] fn round_state(&self) -> FutureResult; + + /// Returns the block most recently finalized by Grandpa, alongside + /// side its justification. + #[pubsub( + subscription = "grandpa_justifications", + subscribe, + name = "grandpa_subscribeJustifications" + )] + fn subscribe_justifications( + &self, + metadata: Self::Metadata, + subscriber: Subscriber + ); + + /// Unsubscribe from receiving notifications about recently finalized blocks. + #[pubsub( + subscription = "grandpa_justifications", + unsubscribe, + name = "grandpa_unsubscribeJustifications" + )] + fn unsubscribe_justifications( + &self, + metadata: Option, + id: SubscriptionId + ) -> jsonrpc_core::Result; } /// Implements the GrandpaApi RPC trait for interacting with GRANDPA. -pub struct GrandpaRpcHandler { +pub struct GrandpaRpcHandler { authority_set: AuthoritySet, voter_state: VoterState, + justification_stream: GrandpaJustificationStream, + manager: SubscriptionManager, } -impl GrandpaRpcHandler { - /// Creates a new GrandpaRpcHander instance. - pub fn new(authority_set: AuthoritySet, voter_state: VoterState) -> Self { +impl GrandpaRpcHandler { + /// Creates a new GrandpaRpcHandler instance. + pub fn new( + authority_set: AuthoritySet, + voter_state: VoterState, + justification_stream: GrandpaJustificationStream, + manager: SubscriptionManager, + ) -> Self { Self { authority_set, voter_state, + justification_stream, + manager, } } } -impl GrandpaApi for GrandpaRpcHandler +impl GrandpaApi + for GrandpaRpcHandler where VoterState: ReportVoterState + Send + Sync + 'static, AuthoritySet: ReportAuthoritySet + Send + Sync + 'static, + Block: BlockT, { + type Metadata = sc_rpc::Metadata; + fn round_state(&self) -> FutureResult { let round_states = ReportedRoundStates::from(&self.authority_set, &self.voter_state); let future = async move { round_states }.boxed(); Box::new(future.map_err(jsonrpc_core::Error::from).compat()) } + + fn subscribe_justifications( + &self, + _metadata: Self::Metadata, + subscriber: Subscriber + ) { + let stream = self.justification_stream.subscribe() + .map(|x| Ok::<_,()>(JustificationNotification::from(x))) + .map_err(|e| warn!("Notification stream error: {:?}", e)) + .compat(); + + self.manager.add(subscriber, |sink| { + let stream = stream.map(|res| Ok(res)); + sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(stream) + .map(|_| ()) + }); + } + + fn unsubscribe_justifications( + &self, + _metadata: Option, + id: SubscriptionId + ) -> jsonrpc_core::Result { + Ok(self.manager.cancel(id)) + } } #[cfg(test)] mod tests { use super::*; - use jsonrpc_core::IoHandler; - use sc_finality_grandpa::{report, AuthorityId}; + use std::{collections::HashSet, convert::TryInto, sync::Arc}; + use jsonrpc_core::{Notification, Output, types::Params}; + + use parity_scale_codec::Decode; + use sc_block_builder::BlockBuilder; + use sc_finality_grandpa::{report, AuthorityId, GrandpaJustificationSender, GrandpaJustification}; + use sp_blockchain::HeaderBackend; + use sp_consensus::RecordProof; use sp_core::crypto::Public; - use std::{collections::HashSet, convert::TryInto}; + use sp_keyring::Ed25519Keyring; + use sp_runtime::traits::Header as HeaderT; + use substrate_test_runtime_client::{ + runtime::Block, + DefaultTestClientBuilderExt, + TestClientBuilderExt, + TestClientBuilder, + }; struct TestAuthoritySet; struct TestVoterState; @@ -106,7 +198,7 @@ mod tests { let voter_id_1 = AuthorityId::from_slice(&[1; 32]); let voters_best: HashSet<_> = vec![voter_id_1].into_iter().collect(); - let best_round_state = report::RoundState { + let best_round_state = sc_finality_grandpa::report::RoundState { total_weight: 100_u64.try_into().unwrap(), threshold_weight: 67_u64.try_into().unwrap(), prevote_current_weight: 50.into(), @@ -115,7 +207,7 @@ mod tests { precommit_ids: HashSet::new(), }; - let past_round_state = report::RoundState { + let past_round_state = sc_finality_grandpa::report::RoundState { total_weight: 100_u64.try_into().unwrap(), threshold_weight: 67_u64.try_into().unwrap(), prevote_current_weight: 100.into(), @@ -133,23 +225,42 @@ mod tests { } } + fn setup_io_handler(voter_state: VoterState) -> ( + jsonrpc_core::MetaIoHandler, + GrandpaJustificationSender, + ) where + VoterState: ReportVoterState + Send + Sync + 'static, + { + let (justification_sender, justification_stream) = GrandpaJustificationStream::channel(); + let manager = SubscriptionManager::new(Arc::new(sc_rpc::testing::TaskExecutor)); + + let handler = GrandpaRpcHandler::new( + TestAuthoritySet, + voter_state, + justification_stream, + manager, + ); + + let mut io = jsonrpc_core::MetaIoHandler::default(); + io.extend_with(GrandpaApi::to_delegate(handler)); + + (io, justification_sender) + } + #[test] fn uninitialized_rpc_handler() { - let handler = GrandpaRpcHandler::new(TestAuthoritySet, EmptyVoterState); - let mut io = IoHandler::new(); - io.extend_with(GrandpaApi::to_delegate(handler)); + let (io, _) = setup_io_handler(EmptyVoterState); let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":1}"#; let response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"GRANDPA RPC endpoint not ready"},"id":1}"#; - assert_eq!(Some(response.into()), io.handle_request_sync(request)); + let meta = sc_rpc::Metadata::default(); + assert_eq!(Some(response.into()), io.handle_request_sync(request, meta)); } #[test] fn working_rpc_handler() { - let handler = GrandpaRpcHandler::new(TestAuthoritySet, TestVoterState); - let mut io = IoHandler::new(); - io.extend_with(GrandpaApi::to_delegate(handler)); + let (io, _) = setup_io_handler(TestVoterState); let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":1}"#; let response = "{\"jsonrpc\":\"2.0\",\"result\":{\ @@ -166,6 +277,154 @@ mod tests { \"setId\":1\ },\"id\":1}"; - assert_eq!(io.handle_request_sync(request), Some(response.into())); + let meta = sc_rpc::Metadata::default(); + assert_eq!(io.handle_request_sync(request, meta), Some(response.into())); + } + + fn setup_session() -> (sc_rpc::Metadata, jsonrpc_core::futures::sync::mpsc::Receiver) { + let (tx, rx) = jsonrpc_core::futures::sync::mpsc::channel(1); + let meta = sc_rpc::Metadata::new(tx); + (meta, rx) + } + + #[test] + fn subscribe_and_unsubscribe_to_justifications() { + let (io, _) = setup_io_handler(TestVoterState); + let (meta, _) = setup_session(); + + // Subscribe + let sub_request = r#"{"jsonrpc":"2.0","method":"grandpa_subscribeJustifications","params":[],"id":1}"#; + let resp = io.handle_request_sync(sub_request, meta.clone()); + let resp: Output = serde_json::from_str(&resp.unwrap()).unwrap(); + + let sub_id = match resp { + Output::Success(success) => success.result, + _ => panic!(), + }; + + // Unsubscribe + let unsub_req = format!( + "{{\"jsonrpc\":\"2.0\",\"method\":\"grandpa_unsubscribeJustifications\",\"params\":[{}],\"id\":1}}", + sub_id + ); + assert_eq!( + io.handle_request_sync(&unsub_req, meta.clone()), + Some(r#"{"jsonrpc":"2.0","result":true,"id":1}"#.into()), + ); + + // Unsubscribe again and fail + assert_eq!( + io.handle_request_sync(&unsub_req, meta), + Some(r#"{"jsonrpc":"2.0","result":false,"id":1}"#.into()), + ); + } + + #[test] + fn subscribe_and_unsubscribe_with_wrong_id() { + let (io, _) = setup_io_handler(TestVoterState); + let (meta, _) = setup_session(); + + // Subscribe + let sub_request = r#"{"jsonrpc":"2.0","method":"grandpa_subscribeJustifications","params":[],"id":1}"#; + let resp = io.handle_request_sync(sub_request, meta.clone()); + let resp: Output = serde_json::from_str(&resp.unwrap()).unwrap(); + assert!(matches!(resp, Output::Success(_))); + + // Unsubscribe with wrong ID + assert_eq!( + io.handle_request_sync( + r#"{"jsonrpc":"2.0","method":"grandpa_unsubscribeJustifications","params":["FOO"],"id":1}"#, + meta.clone() + ), + Some(r#"{"jsonrpc":"2.0","result":false,"id":1}"#.into()) + ); + } + + fn create_justification() -> GrandpaJustification { + let peers = &[Ed25519Keyring::Alice]; + + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = builder.build(); + let client = Arc::new(client); + + let built_block = BlockBuilder::new( + &*client, + client.info().best_hash, + client.info().best_number, + RecordProof::Yes, + Default::default(), + &*backend, + ).unwrap().build().unwrap(); + + let block = built_block.block; + let block_hash = block.hash(); + + let justification = { + let round = 1; + let set_id = 0; + + let precommit = finality_grandpa::Precommit { + target_hash: block_hash, + target_number: *block.header.number(), + }; + + let msg = finality_grandpa::Message::Precommit(precommit.clone()); + let encoded = sp_finality_grandpa::localized_payload(round, set_id, &msg); + let signature = peers[0].sign(&encoded[..]).into(); + + let precommit = finality_grandpa::SignedPrecommit { + precommit, + signature, + id: peers[0].public().into(), + }; + + let commit = finality_grandpa::Commit { + target_hash: block_hash, + target_number: *block.header.number(), + precommits: vec![precommit], + }; + + GrandpaJustification::from_commit(&client, round, commit).unwrap() + }; + + justification + } + + #[test] + fn subscribe_and_listen_to_one_justification() { + let (io, justification_sender) = setup_io_handler(TestVoterState); + let (meta, receiver) = setup_session(); + + // Subscribe + let sub_request = + r#"{"jsonrpc":"2.0","method":"grandpa_subscribeJustifications","params":[],"id":1}"#; + + let resp = io.handle_request_sync(sub_request, meta.clone()); + let mut resp: serde_json::Value = serde_json::from_str(&resp.unwrap()).unwrap(); + let sub_id: String = serde_json::from_value(resp["result"].take()).unwrap(); + + // Notify with a header and justification + let justification = create_justification(); + let _ = justification_sender.notify(justification.clone()).unwrap(); + + // Inspect what we received + let recv = receiver.take(1).wait().flatten().collect::>(); + let recv: Notification = serde_json::from_str(&recv[0]).unwrap(); + let mut json_map = match recv.params { + Params::Map(json_map) => json_map, + _ => panic!(), + }; + + let recv_sub_id: String = + serde_json::from_value(json_map["subscription"].take()).unwrap(); + let recv_justification: Vec = + serde_json::from_value(json_map["result"].take()).unwrap(); + let recv_justification: GrandpaJustification = + Decode::decode(&mut &recv_justification[..]).unwrap(); + + assert_eq!(recv.method, "grandpa_justifications"); + assert_eq!(recv_sub_id, sub_id); + assert_eq!(recv_justification, justification); } } diff --git a/client/finality-grandpa/rpc/src/notification.rs b/client/finality-grandpa/rpc/src/notification.rs new file mode 100644 index 0000000000000..831f4681549a7 --- /dev/null +++ b/client/finality-grandpa/rpc/src/notification.rs @@ -0,0 +1,32 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use serde::{Serialize, Deserialize}; +use parity_scale_codec::Encode; +use sp_runtime::traits::Block as BlockT; +use sc_finality_grandpa::GrandpaJustification; + +/// An encoded justification proving that the given header has been finalized +#[derive(Clone, Serialize, Deserialize)] +pub struct JustificationNotification(Vec); + +impl From> for JustificationNotification { + fn from(notification: GrandpaJustification) -> Self { + JustificationNotification(notification.encode()) + } +} diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index ca47e5e2cc4c5..a7a29fe0e8a65 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -51,6 +51,7 @@ use sp_consensus::SelectChain; use crate::authorities::{AuthoritySet, SharedAuthoritySet}; use crate::communication::Network as NetworkT; use crate::consensus_changes::SharedConsensusChanges; +use crate::notification::GrandpaJustificationSender; use crate::justification::GrandpaJustification; use crate::until_imported::UntilVoteTargetImported; use crate::voting_rule::VotingRule; @@ -390,7 +391,6 @@ impl Metrics { } } - /// The environment we run GRANDPA in. pub(crate) struct Environment, SC, VR> { pub(crate) client: Arc, @@ -404,6 +404,7 @@ pub(crate) struct Environment, SC, pub(crate) voter_set_state: SharedVoterSetState, pub(crate) voting_rule: VR, pub(crate) metrics: Option, + pub(crate) justification_sender: Option>, pub(crate) _phantom: PhantomData, } @@ -1022,6 +1023,7 @@ where number, (round, commit).into(), false, + &self.justification_sender, ) } @@ -1086,6 +1088,7 @@ pub(crate) fn finalize_block( number: NumberFor, justification_or_commit: JustificationOrCommit, initial_sync: bool, + justification_sender: &Option>, ) -> Result<(), CommandOrError>> where Block: BlockT, BE: Backend, @@ -1097,6 +1100,7 @@ pub(crate) fn finalize_block( let mut authority_set = authority_set.inner().write(); let status = client.info(); + if number <= status.finalized_number && client.hash(number)? == Some(hash) { // This can happen after a forced change (triggered by the finality tracker when finality is stalled), since // the voter will be restarted at the median last finalized block, which can be lower than the local best @@ -1157,7 +1161,7 @@ pub(crate) fn finalize_block( // justifications for transition blocks which will be requested by // syncing clients. let justification = match justification_or_commit { - JustificationOrCommit::Justification(justification) => Some(justification.encode()), + JustificationOrCommit::Justification(justification) => Some(justification), JustificationOrCommit::Commit((round_number, commit)) => { let mut justification_required = // justification is always required when block that enacts new authorities @@ -1184,13 +1188,22 @@ pub(crate) fn finalize_block( commit, )?; - Some(justification.encode()) + Some(justification) } else { None } }, }; + // Notify any registered listeners in case we have a justification + if let Some(sender) = justification_sender { + if let Some(ref justification) = justification { + let _ = sender.notify(justification.clone()); + } + } + + let justification = justification.map(|j| j.encode()); + debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash); // ideally some handle to a synchronization oracle would be used diff --git a/client/finality-grandpa/src/finality_proof.rs b/client/finality-grandpa/src/finality_proof.rs index f334ddde2b990..2ac9ec57f3df4 100644 --- a/client/finality-grandpa/src/finality_proof.rs +++ b/client/finality-grandpa/src/finality_proof.rs @@ -149,7 +149,7 @@ impl AuthoritySetForFinalityChecker for Arc { +pub struct FinalityProofProvider { backend: Arc, authority_provider: Arc>, } diff --git a/client/finality-grandpa/src/import.rs b/client/finality-grandpa/src/import.rs index b37ab7907a62e..c9f2d8d2f7bcc 100644 --- a/client/finality-grandpa/src/import.rs +++ b/client/finality-grandpa/src/import.rs @@ -44,6 +44,7 @@ use crate::authorities::{AuthoritySet, SharedAuthoritySet, DelayKind, PendingCha use crate::consensus_changes::SharedConsensusChanges; use crate::environment::finalize_block; use crate::justification::GrandpaJustification; +use crate::notification::GrandpaJustificationSender; use std::marker::PhantomData; /// A block-import handler for GRANDPA. @@ -62,6 +63,7 @@ pub struct GrandpaBlockImport { send_voter_commands: TracingUnboundedSender>>, consensus_changes: SharedConsensusChanges>, authority_set_hard_forks: HashMap>>, + justification_sender: GrandpaJustificationSender, _phantom: PhantomData, } @@ -76,6 +78,7 @@ impl Clone for send_voter_commands: self.send_voter_commands.clone(), consensus_changes: self.consensus_changes.clone(), authority_set_hard_forks: self.authority_set_hard_forks.clone(), + justification_sender: self.justification_sender.clone(), _phantom: PhantomData, } } @@ -560,6 +563,7 @@ impl GrandpaBlockImport>>, consensus_changes: SharedConsensusChanges>, authority_set_hard_forks: Vec<(SetId, PendingChange>)>, + justification_sender: GrandpaJustificationSender, ) -> GrandpaBlockImport { // check for and apply any forced authority set hard fork that applies // to the *current* authority set. @@ -603,6 +607,7 @@ impl GrandpaBlockImport { round: u64, pub(crate) commit: Commit, @@ -47,7 +47,7 @@ pub struct GrandpaJustification { impl GrandpaJustification { /// Create a GRANDPA justification from the given commit. This method /// assumes the commit is valid and well-formed. - pub(crate) fn from_commit( + pub fn from_commit( client: &Arc, round: u64, commit: Commit, diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 7d74d0eebfc48..a586dc8e31f6a 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -119,12 +119,14 @@ mod finality_proof; mod import; mod justification; mod light_import; +mod notification; mod observer; mod until_imported; mod voting_rule; pub use authorities::SharedAuthoritySet; pub use finality_proof::{FinalityProofProvider, StorageAndProofProvider}; +pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream}; pub use import::GrandpaBlockImport; pub use justification::GrandpaJustification; pub use light_import::{light_block_import, GrandpaLightBlockImport}; @@ -448,6 +450,8 @@ pub struct LinkHalf { select_chain: SC, persistent_data: PersistentData, voter_commands_rx: TracingUnboundedReceiver>>, + justification_sender: GrandpaJustificationSender, + justification_stream: GrandpaJustificationStream, } impl LinkHalf { @@ -455,6 +459,11 @@ impl LinkHalf { pub fn shared_authority_set(&self) -> &SharedAuthoritySet> { &self.persistent_data.authority_set } + + /// Get the receiving end of justification notifications. + pub fn justification_stream(&self) -> GrandpaJustificationStream { + self.justification_stream.clone() + } } /// Provider for the Grandpa authority set configured on the genesis block. @@ -553,6 +562,9 @@ where let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command"); + let (justification_sender, justification_stream) = + GrandpaJustificationStream::channel(); + // create pending change objects with 0 delay and enacted on finality // (i.e. standard changes) for each authority set hard fork. let authority_set_hard_forks = authority_set_hard_forks @@ -579,12 +591,15 @@ where voter_commands_tx, persistent_data.consensus_changes.clone(), authority_set_hard_forks, + justification_sender.clone(), ), LinkHalf { client, select_chain, persistent_data, voter_commands_rx, + justification_sender, + justification_stream, }, )) } @@ -719,6 +734,8 @@ pub fn run_grandpa_voter( select_chain, persistent_data, voter_commands_rx, + justification_sender, + justification_stream: _, } = link; let network = NetworkBridge::new( @@ -767,6 +784,7 @@ pub fn run_grandpa_voter( voter_commands_rx, prometheus_registry, shared_voter_state, + justification_sender, ); let voter_work = voter_work @@ -827,6 +845,7 @@ where voter_commands_rx: TracingUnboundedReceiver>>, prometheus_registry: Option, shared_voter_state: SharedVoterState, + justification_sender: GrandpaJustificationSender, ) -> Self { let metrics = match prometheus_registry.as_ref().map(Metrics::register) { Some(Ok(metrics)) => Some(metrics), @@ -850,6 +869,7 @@ where consensus_changes: persistent_data.consensus_changes.clone(), voter_set_state: persistent_data.set_state, metrics: metrics.as_ref().map(|m| m.environment.clone()), + justification_sender: Some(justification_sender), _phantom: PhantomData, }); @@ -988,6 +1008,7 @@ where network: self.env.network.clone(), voting_rule: self.env.voting_rule.clone(), metrics: self.env.metrics.clone(), + justification_sender: self.env.justification_sender.clone(), _phantom: PhantomData, }); diff --git a/client/finality-grandpa/src/notification.rs b/client/finality-grandpa/src/notification.rs new file mode 100644 index 0000000000000..16f705f0eeb1f --- /dev/null +++ b/client/finality-grandpa/src/notification.rs @@ -0,0 +1,102 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::sync::Arc; +use parking_lot::Mutex; + +use sp_runtime::traits::Block as BlockT; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; + +use crate::justification::GrandpaJustification; + +// Stream of justifications returned when subscribing. +type JustificationStream = TracingUnboundedReceiver>; + +// Sending endpoint for notifying about justifications. +type JustificationSender = TracingUnboundedSender>; + +// Collection of channel sending endpoints shared with the receiver side so they can register +// themselves. +type SharedJustificationSenders = Arc>>>; + +/// The sending half of the Grandpa justification channel(s). +/// +/// Used to send notifications about justifications generated +/// at the end of a Grandpa round. +#[derive(Clone)] +pub struct GrandpaJustificationSender { + subscribers: SharedJustificationSenders +} + +impl GrandpaJustificationSender { + /// The `subscribers` should be shared with a corresponding + /// `GrandpaJustificationStream`. + fn new(subscribers: SharedJustificationSenders) -> Self { + Self { + subscribers, + } + } + + /// Send out a notification to all subscribers that a new justification + /// is available for a block. + pub fn notify(&self, notification: GrandpaJustification) -> Result<(), ()> { + self.subscribers.lock().retain(|n| { + !n.is_closed() && n.unbounded_send(notification.clone()).is_ok() + }); + Ok(()) + } +} + +/// The receiving half of the Grandpa justification channel. +/// +/// Used to receive notifications about justifications generated +/// at the end of a Grandpa round. +/// The `GrandpaJustificationStream` entity stores the `SharedJustificationSenders` +/// so it can be used to add more subscriptions. +#[derive(Clone)] +pub struct GrandpaJustificationStream { + subscribers: SharedJustificationSenders +} + +impl GrandpaJustificationStream { + /// Creates a new pair of receiver and sender of justification notifications. + pub fn channel() -> (GrandpaJustificationSender, Self) { + let subscribers = Arc::new(Mutex::new(vec![])); + let receiver = GrandpaJustificationStream::new(subscribers.clone()); + let sender = GrandpaJustificationSender::new(subscribers.clone()); + (sender, receiver) + } + + /// Create a new receiver of justification notifications. + /// + /// The `subscribers` should be shared with a corresponding + /// `GrandpaJustificationSender`. + fn new(subscribers: SharedJustificationSenders) -> Self { + Self { + subscribers, + } + } + + /// Subscribe to a channel through which justifications are sent + /// at the end of each Grandpa voting round. + pub fn subscribe(&self) -> JustificationStream { + let (sender, receiver) = tracing_unbounded("mpsc_justification_notification_stream"); + self.subscribers.lock().push(sender); + receiver + } +} diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index 6a7a1f07b0512..8fb536a369751 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -40,6 +40,7 @@ use crate::{ use crate::authorities::SharedAuthoritySet; use crate::communication::{Network as NetworkT, NetworkBridge}; use crate::consensus_changes::SharedConsensusChanges; +use crate::notification::GrandpaJustificationSender; use sp_finality_grandpa::AuthorityId; use std::marker::{PhantomData, Unpin}; @@ -69,6 +70,7 @@ fn grandpa_observer( authority_set: &SharedAuthoritySet>, consensus_changes: &SharedConsensusChanges>, voters: &Arc>, + justification_sender: &Option>, last_finalized_number: NumberFor, commits: S, note_round: F, @@ -85,6 +87,7 @@ fn grandpa_observer( let consensus_changes = consensus_changes.clone(); let client = client.clone(); let voters = voters.clone(); + let justification_sender = justification_sender.clone(); let observer = commits.try_fold(last_finalized_number, move |last_finalized_number, global| { let (round, commit, callback) = match global { @@ -127,6 +130,7 @@ fn grandpa_observer( finalized_number, (round, commit).into(), false, + &justification_sender, ) { Ok(_) => {}, Err(e) => return future::err(e), @@ -177,6 +181,7 @@ where select_chain: _, persistent_data, voter_commands_rx, + justification_sender, .. } = link; @@ -192,7 +197,8 @@ where network, persistent_data, config.keystore, - voter_commands_rx + voter_commands_rx, + Some(justification_sender), ); let observer_work = observer_work @@ -213,6 +219,7 @@ struct ObserverWork> { persistent_data: PersistentData, keystore: Option, voter_commands_rx: TracingUnboundedReceiver>>, + justification_sender: Option>, _phantom: PhantomData, } @@ -230,6 +237,7 @@ where persistent_data: PersistentData, keystore: Option, voter_commands_rx: TracingUnboundedReceiver>>, + justification_sender: Option>, ) -> Self { let mut work = ObserverWork { @@ -241,6 +249,7 @@ where persistent_data, keystore: keystore.clone(), voter_commands_rx, + justification_sender, _phantom: PhantomData, }; work.rebuild_observer(); @@ -287,6 +296,7 @@ where &self.persistent_data.authority_set, &self.persistent_data.consensus_changes, &voters, + &self.justification_sender, last_finalized_number, global_in, note_round, @@ -422,12 +432,14 @@ mod tests { ).unwrap(); let (_tx, voter_command_rx) = tracing_unbounded(""); + let observer = ObserverWork::new( client, tester.net_handle.clone(), persistent_data, None, voter_command_rx, + None, ); // Trigger a reputation change through the gossip validator. diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index e2cdd7653a64f..d2905e4da4453 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -1567,6 +1567,7 @@ where network, voting_rule, metrics: None, + justification_sender: None, _phantom: PhantomData, } } diff --git a/client/rpc-api/src/lib.rs b/client/rpc-api/src/lib.rs index cd2608dda92be..7bae75181056f 100644 --- a/client/rpc-api/src/lib.rs +++ b/client/rpc-api/src/lib.rs @@ -22,10 +22,12 @@ mod errors; mod helpers; +mod metadata; mod policy; -pub use jsonrpc_core::IoHandlerExtension as RpcExtension; pub use helpers::Receiver; +pub use jsonrpc_core::IoHandlerExtension as RpcExtension; +pub use metadata::Metadata; pub use policy::DenyUnsafe; pub mod author; diff --git a/client/rpc/src/metadata.rs b/client/rpc-api/src/metadata.rs similarity index 95% rename from client/rpc/src/metadata.rs rename to client/rpc-api/src/metadata.rs index 0416b07a6797b..cffcbf61f5440 100644 --- a/client/rpc/src/metadata.rs +++ b/client/rpc-api/src/metadata.rs @@ -19,8 +19,8 @@ //! RPC Metadata use std::sync::Arc; +use jsonrpc_core::futures::sync::mpsc; use jsonrpc_pubsub::{Session, PubSubMetadata}; -use rpc::futures::sync::mpsc; /// RPC Metadata. /// @@ -32,7 +32,7 @@ pub struct Metadata { session: Option>, } -impl rpc::Metadata for Metadata {} +impl jsonrpc_core::Metadata for Metadata {} impl PubSubMetadata for Metadata { fn session(&self) -> Option> { self.session.clone() diff --git a/client/rpc/Cargo.toml b/client/rpc/Cargo.toml index 9c91fa3bc075a..5681672cc3457 100644 --- a/client/rpc/Cargo.toml +++ b/client/rpc/Cargo.toml @@ -37,6 +37,7 @@ sp-transaction-pool = { version = "2.0.0-rc5", path = "../../primitives/transact sp-blockchain = { version = "2.0.0-rc5", path = "../../primitives/blockchain" } hash-db = { version = "0.15.2", default-features = false } parking_lot = "0.10.0" +lazy_static = { version = "1.4.0", optional = true } [dev-dependencies] assert_matches = "1.3.0" @@ -46,4 +47,6 @@ sp-io = { version = "2.0.0-rc5", path = "../../primitives/io" } substrate-test-runtime-client = { version = "2.0.0-rc5", path = "../../test-utils/runtime/client" } tokio = "0.1.22" sc-transaction-pool = { version = "2.0.0-rc5", path = "../transaction-pool" } -lazy_static = "1.4.0" + +[features] +test-helpers = ["lazy_static"] diff --git a/client/rpc/src/author/mod.rs b/client/rpc/src/author/mod.rs index 974c1b85e1b39..e6384c995dce8 100644 --- a/client/rpc/src/author/mod.rs +++ b/client/rpc/src/author/mod.rs @@ -94,7 +94,7 @@ impl AuthorApi, BlockHash

> for Author Client: HeaderBackend + ProvideRuntimeApi + Send + Sync + 'static, Client::Api: SessionKeys, { - type Metadata = crate::metadata::Metadata; + type Metadata = crate::Metadata; fn insert_key( &self, diff --git a/client/rpc/src/chain/mod.rs b/client/rpc/src/chain/mod.rs index 8b6bf19d23565..cb67d9ba23166 100644 --- a/client/rpc/src/chain/mod.rs +++ b/client/rpc/src/chain/mod.rs @@ -106,7 +106,7 @@ trait ChainBackend: Send + Sync + 'static /// All new head subscription fn subscribe_all_heads( &self, - _metadata: crate::metadata::Metadata, + _metadata: crate::Metadata, subscriber: Subscriber, ) { subscribe_headers( @@ -123,7 +123,7 @@ trait ChainBackend: Send + Sync + 'static /// Unsubscribe from all head subscription. fn unsubscribe_all_heads( &self, - _metadata: Option, + _metadata: Option, id: SubscriptionId, ) -> RpcResult { Ok(self.subscriptions().cancel(id)) @@ -132,7 +132,7 @@ trait ChainBackend: Send + Sync + 'static /// New best head subscription fn subscribe_new_heads( &self, - _metadata: crate::metadata::Metadata, + _metadata: crate::Metadata, subscriber: Subscriber, ) { subscribe_headers( @@ -150,7 +150,7 @@ trait ChainBackend: Send + Sync + 'static /// Unsubscribe from new best head subscription. fn unsubscribe_new_heads( &self, - _metadata: Option, + _metadata: Option, id: SubscriptionId, ) -> RpcResult { Ok(self.subscriptions().cancel(id)) @@ -159,7 +159,7 @@ trait ChainBackend: Send + Sync + 'static /// Finalized head subscription fn subscribe_finalized_heads( &self, - _metadata: crate::metadata::Metadata, + _metadata: crate::Metadata, subscriber: Subscriber, ) { subscribe_headers( @@ -176,7 +176,7 @@ trait ChainBackend: Send + Sync + 'static /// Unsubscribe from finalized head subscription. fn unsubscribe_finalized_heads( &self, - _metadata: Option, + _metadata: Option, id: SubscriptionId, ) -> RpcResult { Ok(self.subscriptions().cancel(id)) @@ -230,7 +230,7 @@ impl ChainApi, Block::Hash, Block::Header, Signe Block: BlockT + 'static, Client: HeaderBackend + BlockchainEvents + 'static, { - type Metadata = crate::metadata::Metadata; + type Metadata = crate::Metadata; fn header(&self, hash: Option) -> FutureResult> { self.backend.header(hash) diff --git a/client/rpc/src/lib.rs b/client/rpc/src/lib.rs index 22dccbaa10aa6..434859a39c2f4 100644 --- a/client/rpc/src/lib.rs +++ b/client/rpc/src/lib.rs @@ -27,10 +27,7 @@ use rpc::futures::future::{Executor, ExecuteError, Future}; use sp_core::traits::SpawnNamed; use std::sync::Arc; -mod metadata; - -pub use sc_rpc_api::DenyUnsafe; -pub use self::metadata::Metadata; +pub use sc_rpc_api::{DenyUnsafe, Metadata}; pub use rpc::IoHandlerExtension as RpcExtension; pub mod author; @@ -38,8 +35,9 @@ pub mod chain; pub mod offchain; pub mod state; pub mod system; -#[cfg(test)] -mod testing; + +#[cfg(any(test, feature = "test-helpers"))] +pub mod testing; /// Task executor that is being used by RPC subscriptions. #[derive(Clone)] diff --git a/client/rpc/src/state/mod.rs b/client/rpc/src/state/mod.rs index 921cc7efc699d..ded87c73dc8f7 100644 --- a/client/rpc/src/state/mod.rs +++ b/client/rpc/src/state/mod.rs @@ -140,21 +140,21 @@ pub trait StateBackend: Send + Sync + 'static /// New runtime version subscription fn subscribe_runtime_version( &self, - _meta: crate::metadata::Metadata, + _meta: crate::Metadata, subscriber: Subscriber, ); /// Unsubscribe from runtime version subscription fn unsubscribe_runtime_version( &self, - _meta: Option, + _meta: Option, id: SubscriptionId, ) -> RpcResult; /// New storage subscription fn subscribe_storage( &self, - _meta: crate::metadata::Metadata, + _meta: crate::Metadata, subscriber: Subscriber>, keys: Option>, ); @@ -162,7 +162,7 @@ pub trait StateBackend: Send + Sync + 'static /// Unsubscribe from storage subscription fn unsubscribe_storage( &self, - _meta: Option, + _meta: Option, id: SubscriptionId, ) -> RpcResult; } @@ -230,7 +230,7 @@ impl StateApi for State Block: BlockT + 'static, Client: Send + Sync + 'static, { - type Metadata = crate::metadata::Metadata; + type Metadata = crate::Metadata; fn call(&self, method: String, data: Bytes, block: Option) -> FutureResult { self.backend.call(block, method, data) @@ -390,7 +390,7 @@ impl ChildStateApi for ChildState Block: BlockT + 'static, Client: Send + Sync + 'static, { - type Metadata = crate::metadata::Metadata; + type Metadata = crate::Metadata; fn storage( &self, diff --git a/client/rpc/src/state/state_full.rs b/client/rpc/src/state/state_full.rs index f0ae79a033b58..3a5580c539a4c 100644 --- a/client/rpc/src/state/state_full.rs +++ b/client/rpc/src/state/state_full.rs @@ -373,7 +373,7 @@ impl StateBackend for FullState, ) { let stream = match self.client.storage_changes_notification_stream( @@ -424,7 +424,7 @@ impl StateBackend for FullState, + _meta: Option, id: SubscriptionId, ) -> RpcResult { Ok(self.subscriptions.cancel(id)) @@ -432,7 +432,7 @@ impl StateBackend for FullState>, keys: Option>, ) { @@ -484,7 +484,7 @@ impl StateBackend for FullState, + _meta: Option, id: SubscriptionId, ) -> RpcResult { Ok(self.subscriptions.cancel(id)) diff --git a/client/rpc/src/state/state_light.rs b/client/rpc/src/state/state_light.rs index c7e218541aa5f..01fd1c16018ff 100644 --- a/client/rpc/src/state/state_light.rs +++ b/client/rpc/src/state/state_light.rs @@ -289,7 +289,7 @@ impl StateBackend for LightState>, keys: Option> ) { @@ -384,7 +384,7 @@ impl StateBackend for LightState, + _meta: Option, id: SubscriptionId, ) -> RpcResult { if !self.subscriptions.cancel(id.clone()) { @@ -412,7 +412,7 @@ impl StateBackend for LightState, ) { self.subscriptions.add(subscriber, move |sink| { @@ -459,7 +459,7 @@ impl StateBackend for LightState, + _meta: Option, id: SubscriptionId, ) -> RpcResult { Ok(self.subscriptions.cancel(id)) diff --git a/client/rpc/src/testing.rs b/client/rpc/src/testing.rs index afca07a7fbe6a..9530ff0020644 100644 --- a/client/rpc/src/testing.rs +++ b/client/rpc/src/testing.rs @@ -32,6 +32,7 @@ lazy_static::lazy_static! { type Boxed01Future01 = Box + Send + 'static>; +/// Executor for use in testing pub struct TaskExecutor; impl future01::Executor for TaskExecutor { fn execute( diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index fc9303498d674..eedc4582299d3 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -73,17 +73,17 @@ pub trait RpcExtensionBuilder { /// Returns an instance of the RPC extension for a particular `DenyUnsafe` /// value, e.g. the RPC extension might not expose some unsafe methods. - fn build(&self, deny: sc_rpc::DenyUnsafe) -> Self::Output; + fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output; } impl RpcExtensionBuilder for F where - F: Fn(sc_rpc::DenyUnsafe) -> R, + F: Fn(sc_rpc::DenyUnsafe, SubscriptionManager) -> R, R: sc_rpc::RpcExtension, { type Output = R; - fn build(&self, deny: sc_rpc::DenyUnsafe) -> Self::Output { - (*self)(deny) + fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output { + (*self)(deny, subscriptions) } } @@ -97,7 +97,7 @@ impl RpcExtensionBuilder for NoopRpcExtensionBuilder where { type Output = R; - fn build(&self, _deny: sc_rpc::DenyUnsafe) -> Self::Output { + fn build(&self, _deny: sc_rpc::DenyUnsafe, _subscriptions: SubscriptionManager) -> Self::Output { self.0.clone() } } @@ -764,7 +764,7 @@ fn gen_handler( let author = sc_rpc::author::Author::new( client, transaction_pool, - subscriptions, + subscriptions.clone(), keystore, deny_unsafe, ); @@ -786,7 +786,7 @@ fn gen_handler( maybe_offchain_rpc, author::AuthorApi::to_delegate(author), system::SystemApi::to_delegate(system), - rpc_extensions_builder.build(deny_unsafe), + rpc_extensions_builder.build(deny_unsafe, subscriptions), )) }