From d0b342c4c0d070d5d223ab4b72e783ffd46c87b9 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 16 Jul 2019 15:01:35 +0200 Subject: [PATCH 01/11] core/{service,network}: Put AuthorityId -> MultiAddr on Dht --- Cargo.lock | 1 + core/network/src/lib.rs | 1 + core/network/src/protocol/event.rs | 2 ++ core/network/src/service.rs | 21 ++++++++----- core/service/Cargo.toml | 1 + core/service/src/lib.rs | 47 +++++++++++++++++++++++++----- 6 files changed, 59 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b44d707371fd..841ab3d8cacda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4748,6 +4748,7 @@ dependencies = [ "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "node-executor 2.0.0", "node-primitives 2.0.0", "node-runtime 2.0.0", diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index 28437901e2154..4776b671c6544 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -192,6 +192,7 @@ pub use service::{ NetworkStateInfo, }; pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::event::{Event, DhtEvent}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] diff --git a/core/network/src/protocol/event.rs b/core/network/src/protocol/event.rs index 2edbb0fbf7563..1023ced87b802 100644 --- a/core/network/src/protocol/event.rs +++ b/core/network/src/protocol/event.rs @@ -20,6 +20,7 @@ use libp2p::multihash::Multihash; /// Events generated by DHT as a response to get_value and put_value requests. +#[derive(Debug, Clone)] pub enum DhtEvent { /// The value was found. ValueFound(Vec<(Multihash, Vec)>), @@ -35,6 +36,7 @@ pub enum DhtEvent { } /// Type for events generated by networking layer. +#[derive(Debug, Clone)] pub enum Event { /// Event generated by a DHT. Dht(DhtEvent), diff --git a/core/network/src/service.rs b/core/network/src/service.rs index acd3bbeab7b10..8b87241548e6f 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -25,7 +25,7 @@ //! The methods of the [`NetworkService`] are implemented by sending a message over a channel, //! which is then processed by [`NetworkWorker::poll`]. -use std::{collections::HashMap, fs, marker::PhantomData, io, path::Path}; +use std::{collections::{ HashMap, HashSet }, fs, marker::PhantomData, io, path::Path}; use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use consensus::import_queue::{ImportQueue, Link}; @@ -260,6 +260,12 @@ impl, H: ExHashT> NetworkWorker self.network_service.user_protocol().num_connected_peers() } + /// Returns the local external addresses. + pub fn external_addresses(&self) -> HashSet { + let swarm = &self.network_service; + Swarm::::external_addresses(&swarm).cloned().collect() + } + /// Returns the number of peers we're connected to and that are being queried. pub fn num_active_peers(&self) -> usize { self.network_service.user_protocol().num_active_peers() @@ -589,11 +595,11 @@ pub struct NetworkWorker, H: Ex on_demand_in: Option>>, } -impl, H: ExHashT> Future for NetworkWorker { - type Item = (); +impl, H: ExHashT> Stream for NetworkWorker { + type Item = Event; type Error = io::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll, Self::Error> { // Poll the import queue for actions to perform. let _ = futures03::future::poll_fn(|cx| { self.import_queue.poll_actions(cx, &mut NetworkLink { @@ -613,7 +619,7 @@ impl, H: ExHashT> Future for Ne // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::NotReady), Ok(Async::NotReady) => break, }; @@ -654,8 +660,9 @@ impl, H: ExHashT> Future for Ne Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { self.network_service.user_protocol_mut() - .on_event(Event::Dht(ev)); - CustomMessageOutcome::None + .on_event(Event::Dht(ev.clone())); + + return Ok(Async::Ready(Some(Event::Dht(ev)))); }, Ok(Async::Ready(None)) => CustomMessageOutcome::None, Err(err) => { diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 4ce3facb52372..e3e54b68e54aa 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -34,6 +34,7 @@ rpc = { package = "substrate-rpc-servers", path = "../../core/rpc-servers" } tel = { package = "substrate-telemetry", path = "../../core/telemetry" } offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } +libp2p = { version = "0.11.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } [dev-dependencies] substrate-test-runtime-client = { path = "../test-runtime/client" } diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 9f5de4de1bad4..415a941c5eda0 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -28,10 +28,11 @@ pub mod error; use std::io; use std::marker::PhantomData; use std::net::SocketAddr; -use std::collections::HashMap; use std::time::{Duration, Instant}; +use std::collections::{HashMap, HashSet}; use futures::sync::mpsc; use parking_lot::Mutex; +use libp2p::{ multihash::Multihash, Multiaddr}; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use exit_future::Signal; @@ -47,6 +48,7 @@ use sr_primitives::traits::{Header, NumberFor, SaturatedConversion, Zero}; use substrate_executor::NativeExecutor; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; +use network::{Event, DhtEvent}; pub use self::error::Error; pub use config::{Configuration, Roles, PruningMode}; @@ -431,7 +433,9 @@ impl Service { client.clone(), network_status_sinks.clone(), system_rpc_rx, - has_bootnodes + has_bootnodes, + // TODO: Public key might be , handle that! + public_key.clone(), ) .map_err(|_| ()) .select(exit.clone()) @@ -637,11 +641,17 @@ fn build_network_future< status_sinks: Arc>, NetworkState)>>>>, mut rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, + public_key: String, ) -> impl Future { // Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); + + let hashed_public_key = libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, &public_key.as_bytes()).unwrap(); + let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); + let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); + let mut imported_blocks_stream = client.import_notification_stream().fuse() .map(|v| Ok::<_, ()>(v)).compat(); let mut finality_notification_stream = client.finality_notification_stream().fuse() @@ -655,6 +665,24 @@ fn build_network_future< network.on_block_imported(notification.hash, notification.header); } + while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { + let external_addresses = network.external_addresses(); + + println!("==== external addresses: {:?}", external_addresses); + + // println!("network state: {:?}", network.network_state()); + + // TODO: Remove unwrap. + let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); + + network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); + + // TODO: Let's trigger a search for us for now. Remove. + network.service().get_value(&hashed_public_key.clone()); + } + + + // We poll `finality_notification_stream`, but we only take the last event. let mut last = None; while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { @@ -707,11 +735,16 @@ fn build_network_future< status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); } - // Main network polling. - match network.poll() { - Ok(Async::NotReady) => {} - Err(err) => warn!(target: "service", "Error in network: {:?}", err), - Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"), + while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) { + for (key, value) in values.iter() { + let value = std::str::from_utf8(value).unwrap(); + + let external_addresses: HashSet = serde_json::from_str(value).unwrap(); + + println!("==== Found the following external addresses on the DHT: {:?}", external_addresses); + } } // Now some diagnostic for performances. From 14c93da19008464b42de9b84295b54c04e2daf65 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 19 Jul 2019 09:37:31 +0200 Subject: [PATCH 02/11] core/service: Add trait abst to build_network_future to call runtime Instead of building the network future via an independent function `build_network_future`, follow the convention introduced through `components.rs/OffchainWorker` and have `build_network_future` implement a trait. --- core/service/src/components.rs | 157 ++++++++++++++++++++++++++++++++- core/service/src/lib.rs | 9 +- 2 files changed, 163 insertions(+), 3 deletions(-) diff --git a/core/service/src/components.rs b/core/service/src/components.rs index dff6161f16523..179f0e2e4ac38 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -19,9 +19,18 @@ use std::{sync::Arc, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; use crate::chain_spec::ChainSpec; +use std::time::Duration; +use log::{info, warn, debug}; +use std::collections::{HashMap, HashSet}; +use libp2p::{ multihash::Multihash, Multiaddr}; +use parking_lot::Mutex; +use client::{BlockchainEvents, backend::Backend}; +use futures03::stream::{StreamExt as _, TryStreamExt as _}; + +use network::{Event, DhtEvent}; use client_db; use client::{self, Client, runtime_api}; -use crate::{error, Service, AuthorityKeyProvider}; +use crate::{error, Service, AuthorityKeyProvider, NetworkStatus}; use consensus_common::{import_queue::ImportQueue, SelectChain}; use network::{self, OnDemand, FinalityProofProvider, NetworkStateInfo, config::BoxFinalityProofRequestBuilder}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; @@ -29,6 +38,8 @@ use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as use sr_primitives::{ BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::BlockId }; + +use network::NetworkState; use crate::config::Configuration; use primitives::{Blake2Hasher, H256, Pair}; use rpc::{self, apis::system::SystemInfo}; @@ -268,12 +279,155 @@ impl OffchainWorker for C where } } +pub trait TestRuntime { + fn test_runtime< + H: network::ExHashT, + S:network::specialization::NetworkSpecialization> , + >( + network: network::NetworkWorker, S, H >, + client: Arc>, + status_sinks: Arc>, NetworkState)>>>>, + rpc_rx: mpsc::UnboundedReceiver>>, + should_have_peers: bool, + public_key: String, + )-> Box+ Send> ; +} + +impl TestRuntime for C where + ComponentClient: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: runtime_api::Metadata>, +{ + fn test_runtime< + H: network::ExHashT, + S:network::specialization::NetworkSpecialization> , + >( + mut network: network::NetworkWorker, S, H>, + client: Arc>, + mut status_sinks: Arc>, NetworkState)>>>>, + mut rpc_rx: mpsc::UnboundedReceiver>>, + should_have_peers: bool, + public_key: String, + )-> Box + Send> { + // Interval at which we send status updates on the status stream. + const STATUS_INTERVAL: Duration = Duration::from_millis(5000); + + client.runtime_api().validators(); + + + let hashed_public_key = libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, &public_key.as_bytes()).unwrap(); + + let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); + + let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); + + let mut imported_blocks_stream = client.import_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); + let mut finality_notification_stream = client.finality_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); + + Box::new(futures::future::poll_fn(move || { + // We poll `imported_blocks_stream`. + while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { + network.on_block_imported(notification.hash, notification.header); + } + + while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { + let external_addresses = network.external_addresses(); + + println!("==== external addresses: {:?}", external_addresses); + + // println!("network state: {:?}", network.network_state()); + + // TODO: Remove unwrap. + let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); + + network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); + + // TODO: Let's trigger a search for us for now. Remove. + network.service().get_value(&hashed_public_key.clone()); + } + + + + // We poll `finality_notification_stream`, but we only take the last event. + let mut last = None; + while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { + last = Some(item); + } + if let Some(notification) = last { + network.on_block_finalized(notification.hash, notification.header); + } + + // Poll the RPC requests and answer them. + while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { + match request { + rpc::apis::system::Request::Health(sender) => { + let _ = sender.send(rpc::apis::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.service().is_major_syncing(), + should_have_peers, + }); + }, + rpc::apis::system::Request::Peers(sender) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| + rpc::apis::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ).collect()); + } + rpc::apis::system::Request::NetworkState(sender) => { + let _ = sender.send(network.network_state()); + } + }; + } + + // Interval report for the external API. + while let Ok(Async::Ready(_)) = status_interval.poll() { + let status = NetworkStatus { + sync_state: network.sync_state(), + best_seen_block: network.best_seen_block(), + num_sync_peers: network.num_sync_peers(), + num_connected_peers: network.num_connected_peers(), + num_active_peers: network.num_active_peers(), + average_download_per_sec: network.average_download_per_sec(), + average_upload_per_sec: network.average_upload_per_sec(), + }; + let state = network.network_state(); + + status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); + } + + // Main network polling. + while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) { + for (key, value) in values.iter() { + let value = std::str::from_utf8(value).unwrap(); + + let external_addresses: HashSet = serde_json::from_str(value).unwrap(); + + println!("==== Found the following external addresses on the DHT: {:?}", external_addresses); + } + } + + Ok(Async::NotReady) + + })) + } + +} + /// The super trait that combines all required traits a `Service` needs to implement. pub trait ServiceTrait: Deref> + Send + 'static + StartRPC + + TestRuntime + MaintainTransactionPool + OffchainWorker {} @@ -282,6 +436,7 @@ impl ServiceTrait for T where + Send + 'static + StartRPC + + TestRuntime + MaintainTransactionPool + OffchainWorker {} diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 415a941c5eda0..573e3bd198f72 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -33,6 +33,9 @@ use std::collections::{HashMap, HashSet}; use futures::sync::mpsc; use parking_lot::Mutex; use libp2p::{ multihash::Multihash, Multiaddr}; +use runtime_primitives::traits::{ProvideRuntimeApi}; +use client::{self, Client, runtime_api}; +use crate::components::TestRuntime; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use exit_future::Signal; @@ -211,6 +214,7 @@ impl Service { } let (client, on_demand) = Components::build_client(&config, executor)?; + let select_chain = Components::build_select_chain(&mut config, client.clone())?; let (import_queue, finality_proof_request_builder) = Components::build_import_queue( &mut config, @@ -428,7 +432,7 @@ impl Service { let rpc_handlers = gen_handler(); let rpc = start_rpc_servers::(&config, gen_handler)?; - let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future::( + let _ = to_spawn_tx.unbounded_send(Box::new(Components::RuntimeServices::test_runtime( network_mut, client.clone(), network_status_sinks.clone(), @@ -628,12 +632,13 @@ impl Executor + Send>> } } + /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. fn build_network_future< Components: components::Components, - S: network::specialization::NetworkSpecialization>, + S:network::specialization::NetworkSpecialization> , H: network::ExHashT > ( mut network: network::NetworkWorker, S, H>, From 2d6795619a7eacdf82c3d3d7706309ddf7a477cc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 19 Jul 2019 15:47:56 +0200 Subject: [PATCH 03/11] core/session: Declare + implement runtime SessionApi The runtime SessionApi enables users to retrieve the ids of the current set of validators from the srml session module. --- Cargo.lock | 14 +- core/service/Cargo.toml | 1 + core/service/src/components.rs | 200 +++++++++++++++-------------- core/service/src/lib.rs | 11 +- core/session/primitives/Cargo.toml | 21 +++ core/session/primitives/src/lib.rs | 29 +++++ node/cli/src/service.rs | 5 +- node/primitives/src/lib.rs | 3 + node/runtime/Cargo.toml | 2 + node/runtime/src/lib.rs | 2 +- 10 files changed, 185 insertions(+), 103 deletions(-) create mode 100644 core/session/primitives/Cargo.toml create mode 100644 core/session/primitives/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 841ab3d8cacda..4512a336b0305 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2370,6 +2370,7 @@ dependencies = [ "substrate-keyring 2.0.0", "substrate-offchain-primitives 2.0.0", "substrate-primitives 2.0.0", + "substrate-session-primitives 2.0.0", "substrate-wasm-builder-runner 1.0.2", ] @@ -4747,8 +4748,8 @@ dependencies = [ "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "node-executor 2.0.0", "node-primitives 2.0.0", "node-runtime 2.0.0", @@ -4770,6 +4771,7 @@ dependencies = [ "substrate-offchain 2.0.0", "substrate-primitives 2.0.0", "substrate-rpc-servers 2.0.0", + "substrate-session-primitives 2.0.0", "substrate-telemetry 2.0.0", "substrate-test-runtime-client 2.0.0", "substrate-transaction-pool 2.0.0", @@ -4797,6 +4799,16 @@ dependencies = [ "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "substrate-session-primitives" +version = "2.0.0" +dependencies = [ + "parity-codec 4.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 2.0.0", + "sr-std 2.0.0", + "substrate-client 2.0.0", +] + [[package]] name = "substrate-state-db" version = "2.0.0" diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index e3e54b68e54aa..125c6d6de549d 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -35,6 +35,7 @@ tel = { package = "substrate-telemetry", path = "../../core/telemetry" } offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } libp2p = { version = "0.11.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } +session_primitives = { package = "substrate-session-primitives", path = "../../core/session/primitives", default-features = false } [dev-dependencies] substrate-test-runtime-client = { path = "../test-runtime/client" } diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 179f0e2e4ac38..41cc8c426b1bd 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -20,12 +20,13 @@ use std::{sync::Arc, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; use crate::chain_spec::ChainSpec; use std::time::Duration; -use log::{info, warn, debug}; -use std::collections::{HashMap, HashSet}; -use libp2p::{ multihash::Multihash, Multiaddr}; +use log::warn; +use std::collections::HashSet; +use libp2p::Multiaddr; use parking_lot::Mutex; -use client::{BlockchainEvents, backend::Backend}; +use client::{BlockchainEvents}; use futures03::stream::{StreamExt as _, TryStreamExt as _}; +use session_primitives::SessionApi; use network::{Event, DhtEvent}; use client_db; @@ -280,27 +281,28 @@ impl OffchainWorker for C where } pub trait TestRuntime { - fn test_runtime< - H: network::ExHashT, - S:network::specialization::NetworkSpecialization> , - >( + fn test_runtime< + H: network::ExHashT, + S:network::specialization::NetworkSpecialization> , + >( network: network::NetworkWorker, S, H >, client: Arc>, status_sinks: Arc>, NetworkState)>>>>, rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, public_key: String, - )-> Box+ Send> ; + )-> Box+ Send> ; } impl TestRuntime for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata>, + as ProvideRuntimeApi>::Api: session_primitives::SessionApi, ::AuthorityId>, { - fn test_runtime< - H: network::ExHashT, - S:network::specialization::NetworkSpecialization> , - >( + fn test_runtime< + H: network::ExHashT, + S:network::specialization::NetworkSpecialization> , + >( mut network: network::NetworkWorker, S, H>, client: Arc>, mut status_sinks: Arc>, NetworkState)>>>>, @@ -308,115 +310,117 @@ impl TestRuntime for C where should_have_peers: bool, public_key: String, )-> Box + Send> { - // Interval at which we send status updates on the status stream. - const STATUS_INTERVAL: Duration = Duration::from_millis(5000); + // Interval at which we send status updates on the status stream. + const STATUS_INTERVAL: Duration = Duration::from_millis(5000); - client.runtime_api().validators(); + let hashed_public_key = libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, &public_key.as_bytes()).unwrap(); - let hashed_public_key = libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, &public_key.as_bytes()).unwrap(); + let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); - let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); + let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); - let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); + let mut imported_blocks_stream = client.import_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); + let mut finality_notification_stream = client.finality_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); - let mut imported_blocks_stream = client.import_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); - let mut finality_notification_stream = client.finality_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); + Box::new(futures::future::poll_fn(move || { + // We poll `imported_blocks_stream`. + while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { + network.on_block_imported(notification.hash, notification.header); + } - Box::new(futures::future::poll_fn(move || { - // We poll `imported_blocks_stream`. - while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { - network.on_block_imported(notification.hash, notification.header); - } + while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { + println!("==== public key: {:?}", public_key); + let external_addresses = network.external_addresses(); - while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { - let external_addresses = network.external_addresses(); + println!("==== external addresses: {:?}", external_addresses); - println!("==== external addresses: {:?}", external_addresses); + // println!("network state: {:?}", network.network_state()); - // println!("network state: {:?}", network.network_state()); + // TODO: Remove unwrap. + let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); - // TODO: Remove unwrap. - let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); + network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); - network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); + let id = BlockId::hash( client.info().chain.best_hash); + println!("=== validators: {:?}", client.runtime_api().validators(&id)); - // TODO: Let's trigger a search for us for now. Remove. - network.service().get_value(&hashed_public_key.clone()); - } + // TODO: Let's trigger a search for us for now. Remove. + network.service().get_value(&hashed_public_key.clone()); + } - // We poll `finality_notification_stream`, but we only take the last event. - let mut last = None; - while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { - last = Some(item); - } - if let Some(notification) = last { - network.on_block_finalized(notification.hash, notification.header); - } + // We poll `finality_notification_stream`, but we only take the last event. + let mut last = None; + while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { + last = Some(item); + } + if let Some(notification) = last { + network.on_block_finalized(notification.hash, notification.header); + } - // Poll the RPC requests and answer them. - while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { - match request { - rpc::apis::system::Request::Health(sender) => { - let _ = sender.send(rpc::apis::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.service().is_major_syncing(), - should_have_peers, - }); - }, - rpc::apis::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - rpc::apis::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - ).collect()); - } - rpc::apis::system::Request::NetworkState(sender) => { - let _ = sender.send(network.network_state()); - } - }; - } + // Poll the RPC requests and answer them. + while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { + match request { + rpc::apis::system::Request::Health(sender) => { + let _ = sender.send(rpc::apis::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.service().is_major_syncing(), + should_have_peers, + }); + }, + rpc::apis::system::Request::Peers(sender) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| + rpc::apis::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ).collect()); + } + rpc::apis::system::Request::NetworkState(sender) => { + let _ = sender.send(network.network_state()); + } + }; + } - // Interval report for the external API. - while let Ok(Async::Ready(_)) = status_interval.poll() { - let status = NetworkStatus { - sync_state: network.sync_state(), - best_seen_block: network.best_seen_block(), - num_sync_peers: network.num_sync_peers(), - num_connected_peers: network.num_connected_peers(), - num_active_peers: network.num_active_peers(), - average_download_per_sec: network.average_download_per_sec(), - average_upload_per_sec: network.average_upload_per_sec(), - }; - let state = network.network_state(); - - status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); - } + // Interval report for the external API. + while let Ok(Async::Ready(_)) = status_interval.poll() { + let status = NetworkStatus { + sync_state: network.sync_state(), + best_seen_block: network.best_seen_block(), + num_sync_peers: network.num_sync_peers(), + num_connected_peers: network.num_connected_peers(), + num_active_peers: network.num_active_peers(), + average_download_per_sec: network.average_download_per_sec(), + average_upload_per_sec: network.average_upload_per_sec(), + }; + let state = network.network_state(); + + status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); + } - // Main network polling. - while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { + // Main network polling. + while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { warn!(target: "service", "Error in network: {:?}", err); - }) { - for (key, value) in values.iter() { - let value = std::str::from_utf8(value).unwrap(); + }) { + for (key, value) in values.iter() { + let value = std::str::from_utf8(value).unwrap(); - let external_addresses: HashSet = serde_json::from_str(value).unwrap(); + let external_addresses: HashSet = serde_json::from_str(value).unwrap(); - println!("==== Found the following external addresses on the DHT: {:?}", external_addresses); + println!("==== Found the following external addresses on the DHT: {:?}", external_addresses); + } } - } - Ok(Async::NotReady) + Ok(Async::NotReady) - })) + })) } } @@ -476,6 +480,8 @@ pub trait ServiceFactory: 'static + Sized { type LightImportQueue: ImportQueue + 'static; /// The Fork Choice Strategy for the chain type SelectChain: SelectChain + 'static; + /// + type AuthorityId: parity_codec::Codec + std::fmt::Debug; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 573e3bd198f72..6061299abc7a9 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -32,9 +32,8 @@ use std::time::{Duration, Instant}; use std::collections::{HashMap, HashSet}; use futures::sync::mpsc; use parking_lot::Mutex; -use libp2p::{ multihash::Multihash, Multiaddr}; -use runtime_primitives::traits::{ProvideRuntimeApi}; -use client::{self, Client, runtime_api}; +use libp2p::Multiaddr; +use client::{self}; use crate::components::TestRuntime; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; @@ -636,6 +635,7 @@ impl Executor + Send>> /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. +// TODO delete. fn build_network_future< Components: components::Components, S:network::specialization::NetworkSpecialization> , @@ -651,6 +651,7 @@ fn build_network_future< // Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); + println!("==== public key {}", public_key); let hashed_public_key = libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, &public_key.as_bytes()).unwrap(); let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); @@ -671,6 +672,7 @@ fn build_network_future< } while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { + println!("==== public key {}", public_key); let external_addresses = network.external_addresses(); println!("==== external addresses: {:?}", external_addresses); @@ -1120,6 +1122,8 @@ macro_rules! construct_service_factory { SelectChain = $select_chain:ty { $( $select_chain_init:tt )* }, FinalityProofProvider = { $( $finality_proof_provider_init:tt )* }, + AuthorityId = $authority_id:ty, + } ) => { $( #[$attr] )* @@ -1142,6 +1146,7 @@ macro_rules! construct_service_factory { type FullImportQueue = $full_import_queue; type LightImportQueue = $light_import_queue; type SelectChain = $select_chain; + type AuthorityId = $authority_id; fn build_full_transaction_pool( config: $crate::TransactionPoolOptions, diff --git a/core/session/primitives/Cargo.toml b/core/session/primitives/Cargo.toml new file mode 100644 index 0000000000000..dd573e40c122d --- /dev/null +++ b/core/session/primitives/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "substrate-session-primitives" +version = "2.0.0" +authors = ["Parity Technologies "] +description = "Primitives for session" +edition = "2018" + +[dependencies] +parity-codec = { version = "4.1.1", default-features = false } +client = { package = "substrate-client", path = "../../client", default-features = false } +runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives", default-features = false } +rstd = { package = "sr-std", path = "../../sr-std", default-features = false } + +[features] +default = ["std"] +std = [ + "rstd/std", + "client/std", + "parity-codec/std", + "runtime_primitives/std" +] diff --git a/core/session/primitives/src/lib.rs b/core/session/primitives/src/lib.rs new file mode 100644 index 0000000000000..7c9d210d768de --- /dev/null +++ b/core/session/primitives/src/lib.rs @@ -0,0 +1,29 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! The Session runtime api primitives. + +#![cfg_attr(not(feature = "std"), no_std)] + +use parity_codec::{Codec}; +use client::decl_runtime_apis; +use rstd::vec::Vec; + +decl_runtime_apis! { + pub trait SessionApi { + fn validators() -> Vec; + } +} diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 6c45f45d008cb..2e8723c8632f6 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -29,7 +29,7 @@ use node_executor; use primitives::Pair; use grandpa_primitives::AuthorityPair as GrandpaPair; use futures::prelude::*; -use node_primitives::Block; +use node_primitives::{Block, ValidatorId}; use node_runtime::{GenesisConfig, RuntimeApi}; use substrate_service::{ FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, @@ -253,6 +253,9 @@ construct_service_factory! { FinalityProofProvider = { |client: Arc>| { Ok(Some(Arc::new(GrandpaFinalityProofProvider::new(client.clone(), client)) as _)) }}, + // TODO: Sure this is supposed to be called AuthorityId? SessionId and AuthorityId are not the same according to + // the srml session modul. + AuthorityId = ValidatorId, } } diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 0895a8675d56d..ebba08615a57d 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -44,6 +44,9 @@ pub type Balance = u128; /// Type used for expressing timestamp. pub type Moment = u64; +/// Identity of Validator. +pub type ValidatorId = primitives::sr25519::Public; + /// Index of a transaction in the chain. pub type Index = u64; diff --git a/node/runtime/Cargo.toml b/node/runtime/Cargo.toml index c40e6d779644c..774097b530b57 100644 --- a/node/runtime/Cargo.toml +++ b/node/runtime/Cargo.toml @@ -37,6 +37,7 @@ treasury = { package = "srml-treasury", path = "../../srml/treasury", default-fe sudo = { package = "srml-sudo", path = "../../srml/sudo", default-features = false } im-online = { package = "srml-im-online", path = "../../srml/im-online", default-features = false } node-primitives = { path = "../primitives", default-features = false } +session_primitives = { package = "substrate-session-primitives", path = "../../core/session/primitives", default-features = false } rustc-hex = { version = "2.0", optional = true } serde = { version = "1.0", optional = true } substrate-keyring = { path = "../../core/keyring", optional = true } @@ -79,6 +80,7 @@ std = [ "serde", "safe-mix/std", "client/std", + "session_primitives/std", "rustc-hex", "substrate-keyring", "offchain-primitives/std", diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index 6be19f02ae000..db8c2d692ef55 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -26,7 +26,7 @@ use support::{ }; use primitives::u32_trait::{_1, _2, _3, _4}; use node_primitives::{ - AccountId, AccountIndex, Balance, BlockNumber, Hash, Index, + AccountId, AccountIndex, ValidatorId, Balance, BlockNumber, Hash, Index, Moment, Signature, }; use babe::{AuthorityId as BabeId}; From cf35473c016e76102733c04c06bfc2bb3d650497 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 24 Jul 2019 10:15:47 +0200 Subject: [PATCH 04/11] core/service: Use AuthorityKeyProvider For a node to be able to learn its own authority id (public key) and to be able to sign payloads for the DHT, the node needs to retrieve its authority key. This is done via AuthorityKeyProvider. --- Cargo.lock | 1 + core/client/src/runtime_api.rs | 6 +++++- core/keystore/src/lib.rs | 34 ++++++++++++++++++++++++++++++++++ core/service/Cargo.toml | 1 + core/service/src/components.rs | 23 +++++++++++++++++++---- core/service/src/lib.rs | 11 ++++++++++- node/cli/src/chain_spec.rs | 9 ++++++--- node/cli/src/service.rs | 6 +++--- node/runtime/src/lib.rs | 8 +++++++- 9 files changed, 86 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4512a336b0305..3f154f671f2a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4763,6 +4763,7 @@ dependencies = [ "sr-primitives 2.0.0", "substrate-client 2.0.0", "substrate-client-db 2.0.0", + "substrate-consensus-aura-primitives 2.0.0", "substrate-consensus-common 2.0.0", "substrate-executor 2.0.0", "substrate-finality-grandpa 2.0.0", diff --git a/core/client/src/runtime_api.rs b/core/client/src/runtime_api.rs index 890e265878dba..5172c6f1cdc9b 100644 --- a/core/client/src/runtime_api.rs +++ b/core/client/src/runtime_api.rs @@ -183,5 +183,9 @@ decl_runtime_apis! { /// Validate the given transaction. fn validate_transaction(tx: ::Extrinsic) -> TransactionValidity; } -} + // TODO: This doesn't belong here. + pub trait KeyTypeGetter { + fn get_key_type() -> primitives::crypto::KeyTypeId; + } +} diff --git a/core/keystore/src/lib.rs b/core/keystore/src/lib.rs index 89cfca559c650..325ddb5488d46 100644 --- a/core/keystore/src/lib.rs +++ b/core/keystore/src/lib.rs @@ -86,6 +86,7 @@ impl Store { /// Generate a new key, placing it into the store. pub fn generate(&self, password: &str) -> Result { + println!("==== generating a new key inside keystore, password: {}", password); let (pair, phrase, _) = TPair::generate_with_phrase(Some(password)); let mut file = File::create(self.key_file_path::(&pair.public()))?; ::serde_json::to_writer(&file, &phrase)?; @@ -95,6 +96,7 @@ impl Store { /// Create a new key from seed. Do not place it into the store. pub fn generate_from_seed(&mut self, seed: &str) -> Result { + println!("==== generating a new key inside keystore from seed: {}", seed); let pair = TPair::from_string(seed, None) .ok().ok_or(Error::InvalidSeed)?; self.insert_pair(&pair); @@ -151,6 +153,38 @@ impl Store { Ok(public_keys) } + // // TODO: This function duplicates most of the logic of contents(). Can they share logic? + // pub fn contents_by_type_id(&self, id: KeyTypeId) -> Result>> { + // let mut public_keys: Vec> = self.additional.keys() + // .filter_map(|(ty, public)| { + // if *ty != id { + // return None + // } + // Some(public.to_vec()) + // }) + // .collect(); + + // let key_type: [u8; 4] = id.to_le_bytes(); + // for entry in fs::read_dir(&self.path)? { + // let entry = entry?; + // let path = entry.path(); + + // // skip directories and non-unicode file names (hex is unicode) + // if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + // match hex::decode(name) { + // Ok(ref hex) => { + // if hex[0..4] != key_type { continue } + // let public = &hex[4..]; + // public_keys.push(public.to_vec()); + // } + // _ => continue, + // } + // } + // } + + // Ok(public_keys) + // } + fn key_file_path(&self, public: &TPair::Public) -> PathBuf { let mut buf = self.path.clone(); let bytes: [u8; 4] = TPair::KEY_TYPE.to_le_bytes(); diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 125c6d6de549d..4a9fa30721186 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -36,6 +36,7 @@ offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } libp2p = { version = "0.11.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } session_primitives = { package = "substrate-session-primitives", path = "../../core/session/primitives", default-features = false } +consensus_aura_primitives = { package = "substrate-consensus-aura-primitives", path = "../../core/consensus/aura/primitives", default-features = false } [dev-dependencies] substrate-test-runtime-client = { path = "../test-runtime/client" } diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 41cc8c426b1bd..9b3d7e82d384f 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -27,6 +27,8 @@ use parking_lot::Mutex; use client::{BlockchainEvents}; use futures03::stream::{StreamExt as _, TryStreamExt as _}; use session_primitives::SessionApi; +use consensus_aura_primitives::AuraApi; +use offchain::AuthorityKeyProvider as _; use network::{Event, DhtEvent}; use client_db; @@ -42,9 +44,10 @@ use sr_primitives::{ use network::NetworkState; use crate::config::Configuration; -use primitives::{Blake2Hasher, H256, Pair}; +use primitives::{Blake2Hasher, H256, Pair, Public}; use rpc::{self, apis::system::SystemInfo}; use futures::{prelude::*, future::Executor, sync::mpsc}; +use runtime_api::KeyTypeGetter; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. @@ -290,14 +293,19 @@ pub trait TestRuntime { status_sinks: Arc>, NetworkState)>>>>, rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, + // TODO: still needed? public_key: String, + keystore: ComponentAuthorityKeyProvider, )-> Box+ Send> ; } impl TestRuntime for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata>, - as ProvideRuntimeApi>::Api: session_primitives::SessionApi, ::AuthorityId>, + as ProvideRuntimeApi>::Api: runtime_api::KeyTypeGetter>, + // as ProvideRuntimeApi>::Api: session_primitives::SessionApi, ::AuthorityId>, +// as ProvideRuntimeApi>::Api: consensus_aura_primitives::AuraApi, ::AuthorityId>, +<<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString { fn test_runtime< H: network::ExHashT, @@ -305,10 +313,11 @@ impl TestRuntime for C where >( mut network: network::NetworkWorker, S, H>, client: Arc>, - mut status_sinks: Arc>, NetworkState)>>>>, + status_sinks: Arc>, NetworkState)>>>>, mut rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, public_key: String, + keystore: ComponentAuthorityKeyProvider, )-> Box + Send> { // Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); @@ -345,7 +354,13 @@ impl TestRuntime for C where network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); let id = BlockId::hash( client.info().chain.best_hash); - println!("=== validators: {:?}", client.runtime_api().validators(&id)); + // TODO: These seem to be stash keys, not public keys. + // println!("=== validators: {:?}", client.runtime_api().authorities(&id)); + + // TODO: Remove. + // println!("==== KeyTypeId: {}", client.runtime_api().get_key_type(&id)); + + println!("=== authority key: {}", keystore.authority_key( &id).map(|k| k.public().to_string()).unwrap()); // TODO: Let's trigger a search for us for now. Remove. network.service().get_value(&hashed_public_key.clone()); diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 6061299abc7a9..419e8afd87b4b 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -195,6 +195,7 @@ impl Service { // FIXME #1063 remove this if let Some(keystore) = keystore.as_mut() { for seed in &config.keys { + println!("==== seed: {}", seed); keystore.generate_from_seed::(seed)?; keystore.generate_from_seed::(seed)?; } @@ -207,7 +208,14 @@ impl Service { info!("Generated a new keypair: {:?}", public_key); public_key.to_string() } - } + }; + + match keystore.contents::()?.get(0) { + Some(public_key) => println!("====== Second public key: {}", public_key.to_string()), + None => { + println!("======= no second public key"); + } + }; } else { public_key = format!(""); } @@ -439,6 +447,7 @@ impl Service { has_bootnodes, // TODO: Public key might be , handle that! public_key.clone(), + keystore_authority_key.clone(), ) .map_err(|_| ()) .select(exit.clone()) diff --git a/node/cli/src/chain_spec.rs b/node/cli/src/chain_spec.rs index d2fd0cfbc5100..7dc3e28761b43 100644 --- a/node/cli/src/chain_spec.rs +++ b/node/cli/src/chain_spec.rs @@ -331,11 +331,14 @@ pub fn development_config() -> ChainSpec { } fn local_testnet_genesis() -> GenesisConfig { - testnet_genesis( - vec![ + let authorities = vec![ get_authority_keys_from_seed("Alice"), get_authority_keys_from_seed("Bob"), - ], + ]; + + println!("==== authorities inside node chainspec: {:?}", authorities); + testnet_genesis( + authorities, get_account_id_from_seed("Alice"), None, false, diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 2e8723c8632f6..4782904f47306 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -22,14 +22,13 @@ use std::sync::Arc; use std::time::Duration; use babe::{import_queue, start_babe, BabeImportQueue, Config}; -use babe_primitives::AuthorityPair as BabePair; +use babe_primitives::{AuthorityPair as BabePair, AuthorityId}; use client::{self, LongestChain}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use node_executor; use primitives::Pair; use grandpa_primitives::AuthorityPair as GrandpaPair; use futures::prelude::*; -use node_primitives::{Block, ValidatorId}; use node_runtime::{GenesisConfig, RuntimeApi}; use substrate_service::{ FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, @@ -42,6 +41,7 @@ use network::construct_simple_protocol; use substrate_service::construct_service_factory; use log::info; use substrate_service::TelemetryOnConnect; +use node_primitives::{Block}; construct_simple_protocol! { /// Demo protocol attachment for substrate. @@ -255,7 +255,7 @@ construct_service_factory! { }}, // TODO: Sure this is supposed to be called AuthorityId? SessionId and AuthorityId are not the same according to // the srml session modul. - AuthorityId = ValidatorId, + AuthorityId = AuthorityId, } } diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index db8c2d692ef55..7072c50ec5c60 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -26,7 +26,7 @@ use support::{ }; use primitives::u32_trait::{_1, _2, _3, _4}; use node_primitives::{ - AccountId, AccountIndex, ValidatorId, Balance, BlockNumber, Hash, Index, + AccountId, AccountIndex, Balance, BlockNumber, Hash, Index, Moment, Signature, }; use babe::{AuthorityId as BabeId}; @@ -493,6 +493,12 @@ impl_runtime_apis! { } } + impl client_api::KeyTypeGetter for Runtime { + fn get_key_type() -> substrate_primitives::crypto::KeyTypeId { + substrate_primitives::crypto::key_types::ED25519 + } + } + impl offchain_primitives::OffchainWorkerApi for Runtime { fn offchain_worker(number: NumberFor) { Executive::offchain_worker(number) From 273e8eb65f8007c6708ea8af2b1765d754a9271a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 24 Jul 2019 14:36:05 +0200 Subject: [PATCH 05/11] Cleanup --- core/client/src/runtime_api.rs | 5 - core/keystore/src/lib.rs | 34 ------ core/service/src/components.rs | 63 ++++++------ core/service/src/lib.rs | 159 +---------------------------- core/session/primitives/src/lib.rs | 1 + node/cli/src/chain_spec.rs | 9 +- node/runtime/src/lib.rs | 6 -- 7 files changed, 42 insertions(+), 235 deletions(-) diff --git a/core/client/src/runtime_api.rs b/core/client/src/runtime_api.rs index 5172c6f1cdc9b..e87cd51863475 100644 --- a/core/client/src/runtime_api.rs +++ b/core/client/src/runtime_api.rs @@ -183,9 +183,4 @@ decl_runtime_apis! { /// Validate the given transaction. fn validate_transaction(tx: ::Extrinsic) -> TransactionValidity; } - - // TODO: This doesn't belong here. - pub trait KeyTypeGetter { - fn get_key_type() -> primitives::crypto::KeyTypeId; - } } diff --git a/core/keystore/src/lib.rs b/core/keystore/src/lib.rs index 325ddb5488d46..89cfca559c650 100644 --- a/core/keystore/src/lib.rs +++ b/core/keystore/src/lib.rs @@ -86,7 +86,6 @@ impl Store { /// Generate a new key, placing it into the store. pub fn generate(&self, password: &str) -> Result { - println!("==== generating a new key inside keystore, password: {}", password); let (pair, phrase, _) = TPair::generate_with_phrase(Some(password)); let mut file = File::create(self.key_file_path::(&pair.public()))?; ::serde_json::to_writer(&file, &phrase)?; @@ -96,7 +95,6 @@ impl Store { /// Create a new key from seed. Do not place it into the store. pub fn generate_from_seed(&mut self, seed: &str) -> Result { - println!("==== generating a new key inside keystore from seed: {}", seed); let pair = TPair::from_string(seed, None) .ok().ok_or(Error::InvalidSeed)?; self.insert_pair(&pair); @@ -153,38 +151,6 @@ impl Store { Ok(public_keys) } - // // TODO: This function duplicates most of the logic of contents(). Can they share logic? - // pub fn contents_by_type_id(&self, id: KeyTypeId) -> Result>> { - // let mut public_keys: Vec> = self.additional.keys() - // .filter_map(|(ty, public)| { - // if *ty != id { - // return None - // } - // Some(public.to_vec()) - // }) - // .collect(); - - // let key_type: [u8; 4] = id.to_le_bytes(); - // for entry in fs::read_dir(&self.path)? { - // let entry = entry?; - // let path = entry.path(); - - // // skip directories and non-unicode file names (hex is unicode) - // if let Some(name) = path.file_name().and_then(|n| n.to_str()) { - // match hex::decode(name) { - // Ok(ref hex) => { - // if hex[0..4] != key_type { continue } - // let public = &hex[4..]; - // public_keys.push(public.to_vec()); - // } - // _ => continue, - // } - // } - // } - - // Ok(public_keys) - // } - fn key_file_path(&self, public: &TPair::Public) -> PathBuf { let mut buf = self.path.clone(); let bytes: [u8; 4] = TPair::KEY_TYPE.to_le_bytes(); diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 9b3d7e82d384f..065e94e39a095 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -19,14 +19,13 @@ use std::{sync::Arc, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; use crate::chain_spec::ChainSpec; -use std::time::Duration; -use log::warn; +use std::time::{Duration, Instant}; +use log::{log, warn, Level}; use std::collections::HashSet; use libp2p::Multiaddr; use parking_lot::Mutex; use client::{BlockchainEvents}; use futures03::stream::{StreamExt as _, TryStreamExt as _}; -use session_primitives::SessionApi; use consensus_aura_primitives::AuraApi; use offchain::AuthorityKeyProvider as _; @@ -44,10 +43,9 @@ use sr_primitives::{ use network::NetworkState; use crate::config::Configuration; -use primitives::{Blake2Hasher, H256, Pair, Public}; +use primitives::{Blake2Hasher, H256, Pair}; use rpc::{self, apis::system::SystemInfo}; use futures::{prelude::*, future::Executor, sync::mpsc}; -use runtime_api::KeyTypeGetter; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. @@ -283,8 +281,8 @@ impl OffchainWorker for C where } } -pub trait TestRuntime { - fn test_runtime< +pub trait NetworkFutureBuilder { + fn build_network_future< H: network::ExHashT, S:network::specialization::NetworkSpecialization> , >( @@ -294,20 +292,19 @@ pub trait TestRuntime { rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, // TODO: still needed? - public_key: String, keystore: ComponentAuthorityKeyProvider, )-> Box+ Send> ; } -impl TestRuntime for C where +impl NetworkFutureBuilder for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata>, - as ProvideRuntimeApi>::Api: runtime_api::KeyTypeGetter>, + // as ProvideRuntimeApi>::Api: runtime_api::KeyTypeGetter>, // as ProvideRuntimeApi>::Api: session_primitives::SessionApi, ::AuthorityId>, // as ProvideRuntimeApi>::Api: consensus_aura_primitives::AuraApi, ::AuthorityId>, <<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString { - fn test_runtime< + fn build_network_future< H: network::ExHashT, S:network::specialization::NetworkSpecialization> , >( @@ -316,15 +313,11 @@ impl TestRuntime for C where status_sinks: Arc>, NetworkState)>>>>, mut rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, - public_key: String, keystore: ComponentAuthorityKeyProvider, )-> Box + Send> { // Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); - - let hashed_public_key = libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, &public_key.as_bytes()).unwrap(); - let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); @@ -335,33 +328,34 @@ impl TestRuntime for C where .map(|v| Ok::<_, ()>(v)).compat(); Box::new(futures::future::poll_fn(move || { + let before_polling = Instant::now(); + // We poll `imported_blocks_stream`. while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { network.on_block_imported(notification.hash, notification.header); } while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { - println!("==== public key: {:?}", public_key); - let external_addresses = network.external_addresses(); + let id = BlockId::hash( client.info().chain.best_hash); - println!("==== external addresses: {:?}", external_addresses); + // TODO: remove unwrap(). + let public_key = keystore.authority_key( &id).map(|k| k.public().to_string()).unwrap(); + println!("=== authority key: {}", public_key); + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + &public_key.as_bytes(), + ).unwrap(); - // println!("network state: {:?}", network.network_state()); + let external_addresses = network.external_addresses(); + println!("==== external addresses: {:?}", external_addresses); // TODO: Remove unwrap. let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); - - let id = BlockId::hash( client.info().chain.best_hash); - // TODO: These seem to be stash keys, not public keys. + // TODO: This gets the Aura authorities, what if a user uses babe? That should be abstracted. // println!("=== validators: {:?}", client.runtime_api().authorities(&id)); - // TODO: Remove. - // println!("==== KeyTypeId: {}", client.runtime_api().get_key_type(&id)); - - println!("=== authority key: {}", keystore.authority_key( &id).map(|k| k.public().to_string()).unwrap()); - // TODO: Let's trigger a search for us for now. Remove. network.service().get_value(&hashed_public_key.clone()); } @@ -424,7 +418,7 @@ impl TestRuntime for C where while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { warn!(target: "service", "Error in network: {:?}", err); }) { - for (key, value) in values.iter() { + for (_key, value) in values.iter() { let value = std::str::from_utf8(value).unwrap(); let external_addresses: HashSet = serde_json::from_str(value).unwrap(); @@ -433,6 +427,15 @@ impl TestRuntime for C where } } + // Now some diagnostic for performances. + let polling_dur = before_polling.elapsed(); + log!( + target: "service", + if polling_dur >= Duration::from_millis(50) { Level::Warn } else { Level::Trace }, + "Polling the network future took {:?}", + polling_dur + ); + Ok(Async::NotReady) })) @@ -446,7 +449,7 @@ pub trait ServiceTrait: + Send + 'static + StartRPC - + TestRuntime + + NetworkFutureBuilder + MaintainTransactionPool + OffchainWorker {} @@ -455,7 +458,7 @@ impl ServiceTrait for T where + Send + 'static + StartRPC - + TestRuntime + + NetworkFutureBuilder + MaintainTransactionPool + OffchainWorker {} diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 419e8afd87b4b..f72779d7f7a54 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -28,13 +28,11 @@ pub mod error; use std::io; use std::marker::PhantomData; use std::net::SocketAddr; -use std::time::{Duration, Instant}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use futures::sync::mpsc; use parking_lot::Mutex; -use libp2p::Multiaddr; use client::{self}; -use crate::components::TestRuntime; +use crate::components::NetworkFutureBuilder; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use exit_future::Signal; @@ -42,7 +40,7 @@ use futures::prelude::*; use futures03::stream::{StreamExt as _, TryStreamExt as _}; use keystore::Store as Keystore; use network::{NetworkState, NetworkStateInfo}; -use log::{log, info, warn, debug, error, Level}; +use log::{info, warn, debug, error}; use parity_codec::{Encode, Decode}; use primitives::{Pair, ed25519, sr25519, crypto}; use sr_primitives::generic::BlockId; @@ -50,7 +48,6 @@ use sr_primitives::traits::{Header, NumberFor, SaturatedConversion, Zero}; use substrate_executor::NativeExecutor; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; -use network::{Event, DhtEvent}; pub use self::error::Error; pub use config::{Configuration, Roles, PruningMode}; @@ -195,7 +192,6 @@ impl Service { // FIXME #1063 remove this if let Some(keystore) = keystore.as_mut() { for seed in &config.keys { - println!("==== seed: {}", seed); keystore.generate_from_seed::(seed)?; keystore.generate_from_seed::(seed)?; } @@ -208,14 +204,7 @@ impl Service { info!("Generated a new keypair: {:?}", public_key); public_key.to_string() } - }; - - match keystore.contents::()?.get(0) { - Some(public_key) => println!("====== Second public key: {}", public_key.to_string()), - None => { - println!("======= no second public key"); - } - }; + } } else { public_key = format!(""); } @@ -439,14 +428,12 @@ impl Service { let rpc_handlers = gen_handler(); let rpc = start_rpc_servers::(&config, gen_handler)?; - let _ = to_spawn_tx.unbounded_send(Box::new(Components::RuntimeServices::test_runtime( + let _ = to_spawn_tx.unbounded_send(Box::new(Components::RuntimeServices::build_network_future( network_mut, client.clone(), network_status_sinks.clone(), system_rpc_rx, has_bootnodes, - // TODO: Public key might be , handle that! - public_key.clone(), keystore_authority_key.clone(), ) .map_err(|_| ()) @@ -640,142 +627,6 @@ impl Executor + Send>> } } - -/// Builds a never-ending future that continuously polls the network. -/// -/// The `status_sink` contain a list of senders to send a periodic network status to. -// TODO delete. -fn build_network_future< - Components: components::Components, - S:network::specialization::NetworkSpecialization> , - H: network::ExHashT -> ( - mut network: network::NetworkWorker, S, H>, - client: Arc>, - status_sinks: Arc>, NetworkState)>>>>, - mut rpc_rx: mpsc::UnboundedReceiver>>, - should_have_peers: bool, - public_key: String, -) -> impl Future { - // Interval at which we send status updates on the status stream. - const STATUS_INTERVAL: Duration = Duration::from_millis(5000); - - println!("==== public key {}", public_key); - let hashed_public_key = libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, &public_key.as_bytes()).unwrap(); - - let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); - - let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); - - let mut imported_blocks_stream = client.import_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); - let mut finality_notification_stream = client.finality_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); - - futures::future::poll_fn(move || { - let before_polling = Instant::now(); - - // We poll `imported_blocks_stream`. - while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { - network.on_block_imported(notification.hash, notification.header); - } - - while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { - println!("==== public key {}", public_key); - let external_addresses = network.external_addresses(); - - println!("==== external addresses: {:?}", external_addresses); - - // println!("network state: {:?}", network.network_state()); - - // TODO: Remove unwrap. - let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); - - network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); - - // TODO: Let's trigger a search for us for now. Remove. - network.service().get_value(&hashed_public_key.clone()); - } - - - - // We poll `finality_notification_stream`, but we only take the last event. - let mut last = None; - while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { - last = Some(item); - } - if let Some(notification) = last { - network.on_block_finalized(notification.hash, notification.header); - } - - // Poll the RPC requests and answer them. - while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { - match request { - rpc::apis::system::Request::Health(sender) => { - let _ = sender.send(rpc::apis::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.service().is_major_syncing(), - should_have_peers, - }); - }, - rpc::apis::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - rpc::apis::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - ).collect()); - } - rpc::apis::system::Request::NetworkState(sender) => { - let _ = sender.send(network.network_state()); - } - }; - } - - // Interval report for the external API. - while let Ok(Async::Ready(_)) = status_interval.poll() { - let status = NetworkStatus { - sync_state: network.sync_state(), - best_seen_block: network.best_seen_block(), - num_sync_peers: network.num_sync_peers(), - num_connected_peers: network.num_connected_peers(), - num_active_peers: network.num_active_peers(), - average_download_per_sec: network.average_download_per_sec(), - average_upload_per_sec: network.average_upload_per_sec(), - }; - let state = network.network_state(); - - status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); - } - - while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { - warn!(target: "service", "Error in network: {:?}", err); - }) { - for (key, value) in values.iter() { - let value = std::str::from_utf8(value).unwrap(); - - let external_addresses: HashSet = serde_json::from_str(value).unwrap(); - - println!("==== Found the following external addresses on the DHT: {:?}", external_addresses); - } - } - - // Now some diagnostic for performances. - let polling_dur = before_polling.elapsed(); - log!( - target: "service", - if polling_dur >= Duration::from_millis(50) { Level::Warn } else { Level::Trace }, - "Polling the network future took {:?}", - polling_dur - ); - - Ok(Async::NotReady) - }) -} - /// Overview status of the network. #[derive(Clone)] pub struct NetworkStatus { diff --git a/core/session/primitives/src/lib.rs b/core/session/primitives/src/lib.rs index 7c9d210d768de..3dbdec431a60c 100644 --- a/core/session/primitives/src/lib.rs +++ b/core/session/primitives/src/lib.rs @@ -22,6 +22,7 @@ use parity_codec::{Codec}; use client::decl_runtime_apis; use rstd::vec::Vec; +// TODO: Remove entire module. decl_runtime_apis! { pub trait SessionApi { fn validators() -> Vec; diff --git a/node/cli/src/chain_spec.rs b/node/cli/src/chain_spec.rs index 7dc3e28761b43..d2fd0cfbc5100 100644 --- a/node/cli/src/chain_spec.rs +++ b/node/cli/src/chain_spec.rs @@ -331,14 +331,11 @@ pub fn development_config() -> ChainSpec { } fn local_testnet_genesis() -> GenesisConfig { - let authorities = vec![ + testnet_genesis( + vec![ get_authority_keys_from_seed("Alice"), get_authority_keys_from_seed("Bob"), - ]; - - println!("==== authorities inside node chainspec: {:?}", authorities); - testnet_genesis( - authorities, + ], get_account_id_from_seed("Alice"), None, false, diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index 7072c50ec5c60..6be19f02ae000 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -493,12 +493,6 @@ impl_runtime_apis! { } } - impl client_api::KeyTypeGetter for Runtime { - fn get_key_type() -> substrate_primitives::crypto::KeyTypeId { - substrate_primitives::crypto::key_types::ED25519 - } - } - impl offchain_primitives::OffchainWorkerApi for Runtime { fn offchain_worker(number: NumberFor) { Executive::offchain_worker(number) From de8b99810d541baff0829ab03c09f56a56ef5951 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 24 Jul 2019 16:38:23 +0200 Subject: [PATCH 06/11] core/service: Match received external addresses with their public key --- core/service/src/components.rs | 88 ++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 24 deletions(-) diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 065e94e39a095..0aaabe0d3d15c 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -339,25 +339,47 @@ impl NetworkFutureBuilder for C where let id = BlockId::hash( client.info().chain.best_hash); // TODO: remove unwrap(). - let public_key = keystore.authority_key( &id).map(|k| k.public().to_string()).unwrap(); - println!("=== authority key: {}", public_key); - let hashed_public_key = libp2p::multihash::encode( - libp2p::multihash::Hash::SHA2256, - &public_key.as_bytes(), - ).unwrap(); - - let external_addresses = network.external_addresses(); - println!("==== external addresses: {:?}", external_addresses); - - // TODO: Remove unwrap. - let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); + match keystore.authority_key( &id).map(|k| k.public().to_string()) { + Some(public_key) => { + println!("=== authority key: {}", public_key); + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + &public_key.as_bytes(), + ).unwrap(); + + let external_addresses = network.external_addresses(); + println!("==== external addresses: {:?}", external_addresses); + + // TODO: Remove unwrap. + let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); + + // TODO: Sign the payload before putting it on the DHT. + network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); + }, + None => { + println!("==== Got no authority key"); + } + } - network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); - // TODO: This gets the Aura authorities, what if a user uses babe? That should be abstracted. - // println!("=== validators: {:?}", client.runtime_api().authorities(&id)); - // TODO: Let's trigger a search for us for now. Remove. - network.service().get_value(&hashed_public_key.clone()); + // TODO: Get authorities from current consensus, whatever that might be. + // match client.runtime_api().authorities(&id) { + // Ok(authorities) => { + // for authority in authorities.iter() { + // println!("==== querying dht for authority: {}", authority.to_string()); + // // TODO: Remove unwrap. + // let hashed_public_key = libp2p::multihash::encode( + // libp2p::multihash::Hash::SHA2256, + // authority.to_string().as_bytes(), + // ).unwrap(); + + // network.service().get_value(&hashed_public_key.clone()); + // } + // }, + // Err(e) => { + // println!("==== Got no authorities, but an error: {:?}", e); + // } + // } } @@ -418,12 +440,30 @@ impl NetworkFutureBuilder for C where while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { warn!(target: "service", "Error in network: {:?}", err); }) { - for (_key, value) in values.iter() { - let value = std::str::from_utf8(value).unwrap(); - - let external_addresses: HashSet = serde_json::from_str(value).unwrap(); - - println!("==== Found the following external addresses on the DHT: {:?}", external_addresses); + for (key, value) in values.iter() { + let id = BlockId::hash( client.info().chain.best_hash); + match client.runtime_api().authorities(&id) { + Ok(authorities) => { + for authority in authorities.iter() { + // TODO: Remove unwrap. + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + authority.to_string().as_bytes(), + ).unwrap(); + + if *key == hashed_public_key { + let value = std::str::from_utf8(value).unwrap(); + + let external_addresses: HashSet = serde_json::from_str(value).unwrap(); + + println!("==== Got key {:?} value {:?} from DHT", authority.to_string(), external_addresses); + } + } + }, + Err(e) => { + println!("==== Got no authorities, but an error: {:?}", e); + } + } } } @@ -499,7 +539,7 @@ pub trait ServiceFactory: 'static + Sized { /// The Fork Choice Strategy for the chain type SelectChain: SelectChain + 'static; /// - type AuthorityId: parity_codec::Codec + std::fmt::Debug; + type AuthorityId: parity_codec::Codec + std::fmt::Debug + std::string::ToString; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. From 76642e633f09ed864ad0e37b3508e396a1b8d131 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 25 Jul 2019 15:05:01 +0200 Subject: [PATCH 07/11] core/service: Advice peerset to connect to discovered nodes --- core/service/src/components.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 0aaabe0d3d15c..5f4547a5aee15 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -336,6 +336,7 @@ impl NetworkFutureBuilder for C where } while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { + println!("==== We are connected to {} nodes", network.service().num_connected()); let id = BlockId::hash( client.info().chain.best_hash); // TODO: remove unwrap(). @@ -350,8 +351,15 @@ impl NetworkFutureBuilder for C where let external_addresses = network.external_addresses(); println!("==== external addresses: {:?}", external_addresses); + let enriched_addresses: Vec = external_addresses.iter().map(|a| { + let mut a = a.clone(); + // TODO: Don't get peer id on each iteration. + a.push(libp2p::core::multiaddr::Protocol::P2p(network.service().peer_id().into())); + a + }).collect(); + // TODO: Remove unwrap. - let serialized_addresses = serde_json::to_string(&external_addresses).unwrap(); + let serialized_addresses = serde_json::to_string(&enriched_addresses).unwrap(); // TODO: Sign the payload before putting it on the DHT. network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); @@ -454,9 +462,13 @@ impl NetworkFutureBuilder for C where if *key == hashed_public_key { let value = std::str::from_utf8(value).unwrap(); - let external_addresses: HashSet = serde_json::from_str(value).unwrap(); + let external_addresses: HashSet = serde_json::from_str(value).unwrap(); + println!("==== Got key {:?} value {:?} from DHT", authority.0.to_string(), external_addresses); - println!("==== Got key {:?} value {:?} from DHT", authority.to_string(), external_addresses); + for address in external_addresses.iter() { + // TODO: Why does add_reserved_peer take a string? + network.service().add_reserved_peer(address.to_string()); + } } } }, From fa945bdba6ad44f4c84db98b304c731c25e0e853 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 25 Jul 2019 16:13:44 +0200 Subject: [PATCH 08/11] core/consensus,node/runtime: Declare and implement authorities endpoint The goal of the commit is to be able to retrieve the current set of authorities without needing to know the concrete consensus mechanism in place. In order to achieve the above this commit introduces the `core/consensus/common/primitives` crate, declaring the `ConsensusApi` runtime API. In addition it implements the above mentioned trait definition in `node/runtime` by returning the current authorities of the BABE consensus mechanism. --- Cargo.lock | 1 - node/runtime/Cargo.toml | 2 -- 2 files changed, 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f154f671f2a9..e30dcd3709fb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2370,7 +2370,6 @@ dependencies = [ "substrate-keyring 2.0.0", "substrate-offchain-primitives 2.0.0", "substrate-primitives 2.0.0", - "substrate-session-primitives 2.0.0", "substrate-wasm-builder-runner 1.0.2", ] diff --git a/node/runtime/Cargo.toml b/node/runtime/Cargo.toml index 774097b530b57..c40e6d779644c 100644 --- a/node/runtime/Cargo.toml +++ b/node/runtime/Cargo.toml @@ -37,7 +37,6 @@ treasury = { package = "srml-treasury", path = "../../srml/treasury", default-fe sudo = { package = "srml-sudo", path = "../../srml/sudo", default-features = false } im-online = { package = "srml-im-online", path = "../../srml/im-online", default-features = false } node-primitives = { path = "../primitives", default-features = false } -session_primitives = { package = "substrate-session-primitives", path = "../../core/session/primitives", default-features = false } rustc-hex = { version = "2.0", optional = true } serde = { version = "1.0", optional = true } substrate-keyring = { path = "../../core/keyring", optional = true } @@ -80,7 +79,6 @@ std = [ "serde", "safe-mix/std", "client/std", - "session_primitives/std", "rustc-hex", "substrate-keyring", "offchain-primitives/std", From 72da73f049479f3e8eb75874e0db26f31dd24653 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 25 Jul 2019 16:23:38 +0200 Subject: [PATCH 09/11] core/service: Use common consensus authorities api --- Cargo.lock | 12 +------ core/service/Cargo.toml | 2 +- core/service/src/components.rs | 44 +++++++++++++------------- core/session/primitives/Cargo.toml | 21 ------------ core/session/primitives/src/lib.rs | 30 ------------------ scripts/sentry-node/docker-compose.yml | 13 ++++---- 6 files changed, 31 insertions(+), 91 deletions(-) delete mode 100644 core/session/primitives/Cargo.toml delete mode 100644 core/session/primitives/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index e30dcd3709fb4..bc991f760dcaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4764,6 +4764,7 @@ dependencies = [ "substrate-client-db 2.0.0", "substrate-consensus-aura-primitives 2.0.0", "substrate-consensus-common 2.0.0", + "substrate-consensus-common-primitives 2.0.0", "substrate-executor 2.0.0", "substrate-finality-grandpa 2.0.0", "substrate-keystore 2.0.0", @@ -4771,7 +4772,6 @@ dependencies = [ "substrate-offchain 2.0.0", "substrate-primitives 2.0.0", "substrate-rpc-servers 2.0.0", - "substrate-session-primitives 2.0.0", "substrate-telemetry 2.0.0", "substrate-test-runtime-client 2.0.0", "substrate-transaction-pool 2.0.0", @@ -4799,16 +4799,6 @@ dependencies = [ "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "substrate-session-primitives" -version = "2.0.0" -dependencies = [ - "parity-codec 4.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "sr-primitives 2.0.0", - "sr-std 2.0.0", - "substrate-client 2.0.0", -] - [[package]] name = "substrate-state-db" version = "2.0.0" diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 4a9fa30721186..cb17c4b7e2a70 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -35,7 +35,7 @@ tel = { package = "substrate-telemetry", path = "../../core/telemetry" } offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } libp2p = { version = "0.11.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } -session_primitives = { package = "substrate-session-primitives", path = "../../core/session/primitives", default-features = false } +consensus_common_primitives = { package = "substrate-consensus-common-primitives", path = "../../core/consensus/common/primitives", default-features = false } consensus_aura_primitives = { package = "substrate-consensus-aura-primitives", path = "../../core/consensus/aura/primitives", default-features = false } [dev-dependencies] diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 5f4547a5aee15..324254bc98610 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -28,6 +28,7 @@ use client::{BlockchainEvents}; use futures03::stream::{StreamExt as _, TryStreamExt as _}; use consensus_aura_primitives::AuraApi; use offchain::AuthorityKeyProvider as _; +use consensus_common_primitives::ConsensusApi; use network::{Event, DhtEvent}; use client_db; @@ -300,8 +301,8 @@ impl NetworkFutureBuilder for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata>, // as ProvideRuntimeApi>::Api: runtime_api::KeyTypeGetter>, - // as ProvideRuntimeApi>::Api: session_primitives::SessionApi, ::AuthorityId>, -// as ProvideRuntimeApi>::Api: consensus_aura_primitives::AuraApi, ::AuthorityId>, + as ProvideRuntimeApi>::Api: consensus_common_primitives::ConsensusApi, ::AuthorityId>, +// as ProvideRuntimeApi>::Api: consensus_babe_primitives::BabeApi>, <<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString { fn build_network_future< @@ -369,25 +370,23 @@ impl NetworkFutureBuilder for C where } } - - // TODO: Get authorities from current consensus, whatever that might be. - // match client.runtime_api().authorities(&id) { - // Ok(authorities) => { - // for authority in authorities.iter() { - // println!("==== querying dht for authority: {}", authority.to_string()); - // // TODO: Remove unwrap. - // let hashed_public_key = libp2p::multihash::encode( - // libp2p::multihash::Hash::SHA2256, - // authority.to_string().as_bytes(), - // ).unwrap(); - - // network.service().get_value(&hashed_public_key.clone()); - // } - // }, - // Err(e) => { - // println!("==== Got no authorities, but an error: {:?}", e); - // } - // } + match client.runtime_api().authorities(&id) { + Ok(authorities) => { + for authority in authorities.iter() { + println!("==== querying dht for authority: {}", authority.to_string()); + // TODO: Remove unwrap. + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + authority.to_string().as_bytes(), + ).unwrap(); + + network.service().get_value(&hashed_public_key.clone()); + } + }, + Err(e) => { + println!("==== Got no authorities, but an error: {:?}", e); + } + } } @@ -450,6 +449,7 @@ impl NetworkFutureBuilder for C where }) { for (key, value) in values.iter() { let id = BlockId::hash( client.info().chain.best_hash); + match client.runtime_api().authorities(&id) { Ok(authorities) => { for authority in authorities.iter() { @@ -463,7 +463,7 @@ impl NetworkFutureBuilder for C where let value = std::str::from_utf8(value).unwrap(); let external_addresses: HashSet = serde_json::from_str(value).unwrap(); - println!("==== Got key {:?} value {:?} from DHT", authority.0.to_string(), external_addresses); + println!("==== Got key {:?} value {:?} from DHT", authority.to_string(), external_addresses); for address in external_addresses.iter() { // TODO: Why does add_reserved_peer take a string? diff --git a/core/session/primitives/Cargo.toml b/core/session/primitives/Cargo.toml deleted file mode 100644 index dd573e40c122d..0000000000000 --- a/core/session/primitives/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "substrate-session-primitives" -version = "2.0.0" -authors = ["Parity Technologies "] -description = "Primitives for session" -edition = "2018" - -[dependencies] -parity-codec = { version = "4.1.1", default-features = false } -client = { package = "substrate-client", path = "../../client", default-features = false } -runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives", default-features = false } -rstd = { package = "sr-std", path = "../../sr-std", default-features = false } - -[features] -default = ["std"] -std = [ - "rstd/std", - "client/std", - "parity-codec/std", - "runtime_primitives/std" -] diff --git a/core/session/primitives/src/lib.rs b/core/session/primitives/src/lib.rs deleted file mode 100644 index 3dbdec431a60c..0000000000000 --- a/core/session/primitives/src/lib.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! The Session runtime api primitives. - -#![cfg_attr(not(feature = "std"), no_std)] - -use parity_codec::{Codec}; -use client::decl_runtime_apis; -use rstd::vec::Vec; - -// TODO: Remove entire module. -decl_runtime_apis! { - pub trait SessionApi { - fn validators() -> Vec; - } -} diff --git a/scripts/sentry-node/docker-compose.yml b/scripts/sentry-node/docker-compose.yml index 78f8ba36b82ee..e07d1e8145251 100644 --- a/scripts/sentry-node/docker-compose.yml +++ b/scripts/sentry-node/docker-compose.yml @@ -30,7 +30,7 @@ services: - ../../target/release/substrate:/usr/local/bin/substrate image: parity/substrate networks: - - network-a + - internet command: # Local node id: QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR - "--node-key" @@ -57,6 +57,7 @@ services: - "--no-telemetry" - "--rpc-cors" - "all" + - "--no-mdns" sentry-a: image: parity/substrate @@ -83,9 +84,9 @@ services: # - "--validator" - "--name" - "CharliesNode" - - "--bootnodes" + - "--reserved-nodes" - "/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR" - - "--bootnodes" + - "--reserved-nodes" - "/dns4/validator-b/tcp/30333/p2p/QmSVnNf9HwVMT1Y4cK1P6aoJcEZjmoTXpjKBmAABLMnZEk" - "--no-telemetry" - "--rpc-cors" @@ -96,6 +97,7 @@ services: # Make sure sentry-a still participates as a grandpa voter to forward # grandpa finality gossip messages. - "--grandpa-voter" + - "--no-mdns" validator-b: image: parity/substrate @@ -119,9 +121,7 @@ services: - "--validator" - "--name" - "BobsNode" - - "--bootnodes" - - "/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR" - - "--bootnodes" + - "--reserved-nodes" - "/dns4/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi" - "--no-telemetry" - "--rpc-cors" @@ -129,6 +129,7 @@ services: # Not only bind to localhost. - "--ws-external" - "--rpc-external" + - "--no-mdns" ui: image: polkadot-js/apps From 9c6ae0e915f5329781a4db387ef5c1fced5ca247 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 26 Jul 2019 16:23:04 +0200 Subject: [PATCH 10/11] core/service: Verify signature before adding node When retrieving addresses of other validators via the DHT, make sure they are within the current validator set and the payload is properly signed with the authority key. --- core/service/src/components.rs | 70 ++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 21 deletions(-) diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 324254bc98610..5448dcf3e87fe 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -21,12 +21,10 @@ use serde::{Serialize, de::DeserializeOwned}; use crate::chain_spec::ChainSpec; use std::time::{Duration, Instant}; use log::{log, warn, Level}; -use std::collections::HashSet; use libp2p::Multiaddr; use parking_lot::Mutex; use client::{BlockchainEvents}; use futures03::stream::{StreamExt as _, TryStreamExt as _}; -use consensus_aura_primitives::AuraApi; use offchain::AuthorityKeyProvider as _; use consensus_common_primitives::ConsensusApi; @@ -301,9 +299,9 @@ impl NetworkFutureBuilder for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata>, // as ProvideRuntimeApi>::Api: runtime_api::KeyTypeGetter>, +// TODO: This is a mess. Can this be done cleaner? as ProvideRuntimeApi>::Api: consensus_common_primitives::ConsensusApi, ::AuthorityId>, -// as ProvideRuntimeApi>::Api: consensus_babe_primitives::BabeApi>, -<<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString +<<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString, { fn build_network_future< H: network::ExHashT, @@ -341,13 +339,14 @@ impl NetworkFutureBuilder for C where let id = BlockId::hash( client.info().chain.best_hash); // TODO: remove unwrap(). - match keystore.authority_key( &id).map(|k| k.public().to_string()) { - Some(public_key) => { + match keystore.authority_key( &id) { + Some(authority_key) => { + let public_key = authority_key.public().to_string(); println!("=== authority key: {}", public_key); let hashed_public_key = libp2p::multihash::encode( libp2p::multihash::Hash::SHA2256, &public_key.as_bytes(), - ).unwrap(); + ).expect("public key hashing not to fail"); let external_addresses = network.external_addresses(); println!("==== external addresses: {:?}", external_addresses); @@ -360,10 +359,22 @@ impl NetworkFutureBuilder for C where }).collect(); // TODO: Remove unwrap. - let serialized_addresses = serde_json::to_string(&enriched_addresses).unwrap(); + let signature = authority_key.sign( + &serde_json::to_string(&enriched_addresses) + .map(|s| s.into_bytes()) + .expect("enriched_address marshaling not to fail") + ); - // TODO: Sign the payload before putting it on the DHT. - network.service().put_value(hashed_public_key.clone(), serialized_addresses.as_bytes().to_vec()); + let sig_bytes: &[u8] = signature.as_ref(); + + let sig_vec: Vec = sig_bytes.to_vec(); + + println!("===== signature authorityid {}: {:?}", public_key, sig_vec); + + // TODO: Remove unwrap. + let payload = serde_json::to_string(&(enriched_addresses, sig_vec)).expect("payload marshaling not to fail"); + + network.service().put_value(hashed_public_key, payload.into_bytes()); }, None => { println!("==== Got no authority key"); @@ -378,7 +389,7 @@ impl NetworkFutureBuilder for C where let hashed_public_key = libp2p::multihash::encode( libp2p::multihash::Hash::SHA2256, authority.to_string().as_bytes(), - ).unwrap(); + ).expect("public key hashing not to fail"); network.service().get_value(&hashed_public_key.clone()); } @@ -457,17 +468,33 @@ impl NetworkFutureBuilder for C where let hashed_public_key = libp2p::multihash::encode( libp2p::multihash::Hash::SHA2256, authority.to_string().as_bytes(), - ).unwrap(); + ).expect("public key hashing not to fail"); if *key == hashed_public_key { - let value = std::str::from_utf8(value).unwrap(); - - let external_addresses: HashSet = serde_json::from_str(value).unwrap(); - println!("==== Got key {:?} value {:?} from DHT", authority.to_string(), external_addresses); - - for address in external_addresses.iter() { - // TODO: Why does add_reserved_peer take a string? - network.service().add_reserved_peer(address.to_string()); + let value = std::str::from_utf8(value).expect("value to string not to fail"); + + let (addresses, signature): (Vec, Vec) = serde_json::from_str(value).expect("payload unmarshaling not to fail"); + println!("==== Got key {:?} value {:?} from DHT", authority.to_string(), addresses); + + let sig_bytes: &[u8] = &signature; + + println!("===== got signature authorityid {}: {:?}", authority.to_string(), signature); + + // TODO: is using verify-weak a problem here? + if <::Factory as ServiceFactory>::ConsensusPair::verify_weak( + sig_bytes, + &serde_json::to_string(&addresses) + .map(|s| s.into_bytes()) + .expect("address marshaling not to fail"), + authority, + ) { + for address in addresses.iter() { + // TODO: Why does add_reserved_peer take a string? + // TODO: Remove unwrap. + network.service().add_reserved_peer(address.to_string()).expect("adding reserved peer not to fail"); + } + } else { + println!("==== signature not valid"); } } } @@ -551,7 +578,8 @@ pub trait ServiceFactory: 'static + Sized { /// The Fork Choice Strategy for the chain type SelectChain: SelectChain + 'static; /// - type AuthorityId: parity_codec::Codec + std::fmt::Debug + std::string::ToString; + // TODO: Are all of these traits necessary? + type AuthorityId: primitives::crypto::Public + std::hash::Hash + parity_codec::Codec + std::fmt::Debug + std::string::ToString; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. From bbeea1dc803cdd98d0384a750096bfca1741b080 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 29 Jul 2019 11:52:05 +0200 Subject: [PATCH 11/11] core/service: Refactor build_network_future --- core/network/src/service.rs | 8 +- core/service/src/components.rs | 220 +++++++++++++------------ node/cli/src/service.rs | 2 - node/primitives/src/lib.rs | 3 - scripts/sentry-node/docker-compose.yml | 13 +- 5 files changed, 118 insertions(+), 128 deletions(-) diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 8b87241548e6f..bc6f67933dc06 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -25,7 +25,7 @@ //! The methods of the [`NetworkService`] are implemented by sending a message over a channel, //! which is then processed by [`NetworkWorker::poll`]. -use std::{collections::{ HashMap, HashSet }, fs, marker::PhantomData, io, path::Path}; +use std::{collections::HashMap, fs, marker::PhantomData, io, path::Path}; use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use consensus::import_queue::{ImportQueue, Link}; @@ -260,12 +260,6 @@ impl, H: ExHashT> NetworkWorker self.network_service.user_protocol().num_connected_peers() } - /// Returns the local external addresses. - pub fn external_addresses(&self) -> HashSet { - let swarm = &self.network_service; - Swarm::::external_addresses(&swarm).cloned().collect() - } - /// Returns the number of peers we're connected to and that are being queried. pub fn num_active_peers(&self) -> usize { self.network_service.user_protocol().num_active_peers() diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 5448dcf3e87fe..ace3f0ec09bd4 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -281,42 +281,39 @@ impl OffchainWorker for C where } pub trait NetworkFutureBuilder { - fn build_network_future< - H: network::ExHashT, - S:network::specialization::NetworkSpecialization> , - >( + fn build_network_future( network: network::NetworkWorker, S, H >, client: Arc>, status_sinks: Arc>, NetworkState)>>>>, rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, - // TODO: still needed? - keystore: ComponentAuthorityKeyProvider, - )-> Box+ Send> ; + authority_key_provider: ComponentAuthorityKeyProvider, + )-> Box+ Send> + where + H: network::ExHashT, + S:network::specialization::NetworkSpecialization>; } -impl NetworkFutureBuilder for C where +impl NetworkFutureBuilder for C +where ComponentClient: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: runtime_api::Metadata>, - // as ProvideRuntimeApi>::Api: runtime_api::KeyTypeGetter>, -// TODO: This is a mess. Can this be done cleaner? - as ProvideRuntimeApi>::Api: consensus_common_primitives::ConsensusApi, ::AuthorityId>, -<<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString, + as ProvideRuntimeApi>::Api: ConsensusApi, ::AuthorityId>, + <<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString, { - fn build_network_future< - H: network::ExHashT, - S:network::specialization::NetworkSpecialization> , - >( + fn build_network_future( mut network: network::NetworkWorker, S, H>, client: Arc>, status_sinks: Arc>, NetworkState)>>>>, mut rpc_rx: mpsc::UnboundedReceiver>>, should_have_peers: bool, - keystore: ComponentAuthorityKeyProvider, - )-> Box + Send> { + authority_key_provider: ComponentAuthorityKeyProvider, + )-> Box + Send> + where + H: network::ExHashT, + S:network::specialization::NetworkSpecialization>, + { // Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); - let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); @@ -338,49 +335,40 @@ impl NetworkFutureBuilder for C where println!("==== We are connected to {} nodes", network.service().num_connected()); let id = BlockId::hash( client.info().chain.best_hash); - // TODO: remove unwrap(). - match keystore.authority_key( &id) { - Some(authority_key) => { - let public_key = authority_key.public().to_string(); - println!("=== authority key: {}", public_key); - let hashed_public_key = libp2p::multihash::encode( - libp2p::multihash::Hash::SHA2256, - &public_key.as_bytes(), - ).expect("public key hashing not to fail"); + // Put our addresses on the DHT if we are a validator. + if let Some(authority_key) = authority_key_provider.authority_key( &id) { + let public_key = authority_key.public().to_string(); - let external_addresses = network.external_addresses(); - println!("==== external addresses: {:?}", external_addresses); + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + &public_key.as_bytes(), + ).expect("public key hashing not to fail"); - let enriched_addresses: Vec = external_addresses.iter().map(|a| { + let addresses: Vec = network.service().external_addresses() + .iter() + .map(|a| { let mut a = a.clone(); - // TODO: Don't get peer id on each iteration. a.push(libp2p::core::multiaddr::Protocol::P2p(network.service().peer_id().into())); a - }).collect(); - - // TODO: Remove unwrap. - let signature = authority_key.sign( - &serde_json::to_string(&enriched_addresses) - .map(|s| s.into_bytes()) - .expect("enriched_address marshaling not to fail") - ); + }) + .collect(); + println!("==== external addresses: {:?}", addresses); - let sig_bytes: &[u8] = signature.as_ref(); + // TODO: Remove unwrap. + let signature = authority_key.sign( + &serde_json::to_string(&addresses) + .map(|s| s.into_bytes()) + .expect("enriched_address marshaling not to fail") + ).as_ref().to_vec(); - let sig_vec: Vec = sig_bytes.to_vec(); + // TODO: Remove unwrap. + let payload = serde_json::to_string(&(addresses, signature)).expect("payload marshaling not to fail"); - println!("===== signature authorityid {}: {:?}", public_key, sig_vec); - - // TODO: Remove unwrap. - let payload = serde_json::to_string(&(enriched_addresses, sig_vec)).expect("payload marshaling not to fail"); - - network.service().put_value(hashed_public_key, payload.into_bytes()); - }, - None => { - println!("==== Got no authority key"); - } + network.service().put_value(hashed_public_key, payload.into_bytes()); } + // Query addresses of other validators. + // TODO: Should non-validators also do this? Probably not a good default. match client.runtime_api().authorities(&id) { Ok(authorities) => { for authority in authorities.iter() { @@ -400,8 +388,6 @@ impl NetworkFutureBuilder for C where } } - - // We poll `finality_notification_stream`, but we only take the last event. let mut last = None; while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { @@ -423,13 +409,13 @@ impl NetworkFutureBuilder for C where }, rpc::apis::system::Request::Peers(sender) => { let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - rpc::apis::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } + rpc::apis::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } ).collect()); } rpc::apis::system::Request::NetworkState(sender) => { @@ -454,57 +440,74 @@ impl NetworkFutureBuilder for C where status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); } - // Main network polling. - while let Ok(Async::Ready(Some(Event::Dht(DhtEvent::ValueFound(values))))) = network.poll().map_err(|err| { - warn!(target: "service", "Error in network: {:?}", err); - }) { + let authorities = client.runtime_api().authorities(&BlockId::hash(client.info().chain.best_hash)); + let valid_authority = |a: &libp2p::multihash::Multihash| { + match &authorities { + Ok(authorities) => { + for authority in authorities.iter() { + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + authority.to_string().as_bytes(), + ).expect("public key hashing not to fail"); + + // TODO: Comparing two pointers is safe, right? Given they are not fat-pointers. + if a == &hashed_public_key { + return Some(authority.clone()); + } + } + }, + // TODO: Should we handle the error here? + Err(_e) => {}, + } + + return None; + }; + + // TODO: Can we do this nicer? + let network_service = network.service().clone(); + let add_reserved_peer = |values: Vec<(libp2p::multihash::Multihash, Vec)>| { for (key, value) in values.iter() { - let id = BlockId::hash( client.info().chain.best_hash); + // TODO: Should we log if it is not a valid one? + if let Some(authority_pub_key) = valid_authority(key) { + println!("===== adding other node"); + let value = std::str::from_utf8(value).expect("value to string not to fail"); - match client.runtime_api().authorities(&id) { - Ok(authorities) => { - for authority in authorities.iter() { + let (addresses, signature): (Vec, Vec) = serde_json::from_str(value).expect("payload unmarshaling not to fail"); + + // TODO: is using verify-weak a problem here? + if <::Factory as ServiceFactory>::ConsensusPair::verify_weak( + &signature, + &serde_json::to_string(&addresses) + .map(|s| s.into_bytes()) + .expect("address marshaling not to fail"), + authority_pub_key, + ) { + for address in addresses.iter() { + // TODO: Why does add_reserved_peer take a string? // TODO: Remove unwrap. - let hashed_public_key = libp2p::multihash::encode( - libp2p::multihash::Hash::SHA2256, - authority.to_string().as_bytes(), - ).expect("public key hashing not to fail"); - - if *key == hashed_public_key { - let value = std::str::from_utf8(value).expect("value to string not to fail"); - - let (addresses, signature): (Vec, Vec) = serde_json::from_str(value).expect("payload unmarshaling not to fail"); - println!("==== Got key {:?} value {:?} from DHT", authority.to_string(), addresses); - - let sig_bytes: &[u8] = &signature; - - println!("===== got signature authorityid {}: {:?}", authority.to_string(), signature); - - // TODO: is using verify-weak a problem here? - if <::Factory as ServiceFactory>::ConsensusPair::verify_weak( - sig_bytes, - &serde_json::to_string(&addresses) - .map(|s| s.into_bytes()) - .expect("address marshaling not to fail"), - authority, - ) { - for address in addresses.iter() { - // TODO: Why does add_reserved_peer take a string? - // TODO: Remove unwrap. - network.service().add_reserved_peer(address.to_string()).expect("adding reserved peer not to fail"); - } - } else { - println!("==== signature not valid"); - } - } + network_service.add_reserved_peer(address.to_string()).expect("adding reserved peer not to fail"); } - }, - Err(e) => { - println!("==== Got no authorities, but an error: {:?}", e); + } else { + // TODO: Log, don't print. + println!("==== signature not valid"); } + } else { + println!("==== Did not find a match for the key"); } } - } + }; + + // Main network polling. + while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) { + match event { + DhtEvent::ValueFound(values) => add_reserved_peer(values), + DhtEvent::ValueNotFound(_h) => println!("==== Didn't find hash"), + DhtEvent::ValuePut(_h) => {}, + DhtEvent::ValuePutFailed(_h) => println!("==== failed to put value on DHT"), + } + }; // Now some diagnostic for performances. let polling_dur = before_polling.elapsed(); @@ -516,7 +519,6 @@ impl NetworkFutureBuilder for C where ); Ok(Async::NotReady) - })) } @@ -578,8 +580,8 @@ pub trait ServiceFactory: 'static + Sized { /// The Fork Choice Strategy for the chain type SelectChain: SelectChain + 'static; /// - // TODO: Are all of these traits necessary? - type AuthorityId: primitives::crypto::Public + std::hash::Hash + parity_codec::Codec + std::fmt::Debug + std::string::ToString; + // TODO: Are all of these trait bounds necessary? + type AuthorityId: primitives::crypto::Public + std::hash::Hash + parity_codec::Codec + std::string::ToString; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 4782904f47306..c35ad4f6516f2 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -253,8 +253,6 @@ construct_service_factory! { FinalityProofProvider = { |client: Arc>| { Ok(Some(Arc::new(GrandpaFinalityProofProvider::new(client.clone(), client)) as _)) }}, - // TODO: Sure this is supposed to be called AuthorityId? SessionId and AuthorityId are not the same according to - // the srml session modul. AuthorityId = AuthorityId, } } diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index ebba08615a57d..0895a8675d56d 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -44,9 +44,6 @@ pub type Balance = u128; /// Type used for expressing timestamp. pub type Moment = u64; -/// Identity of Validator. -pub type ValidatorId = primitives::sr25519::Public; - /// Index of a transaction in the chain. pub type Index = u64; diff --git a/scripts/sentry-node/docker-compose.yml b/scripts/sentry-node/docker-compose.yml index e07d1e8145251..78f8ba36b82ee 100644 --- a/scripts/sentry-node/docker-compose.yml +++ b/scripts/sentry-node/docker-compose.yml @@ -30,7 +30,7 @@ services: - ../../target/release/substrate:/usr/local/bin/substrate image: parity/substrate networks: - - internet + - network-a command: # Local node id: QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR - "--node-key" @@ -57,7 +57,6 @@ services: - "--no-telemetry" - "--rpc-cors" - "all" - - "--no-mdns" sentry-a: image: parity/substrate @@ -84,9 +83,9 @@ services: # - "--validator" - "--name" - "CharliesNode" - - "--reserved-nodes" + - "--bootnodes" - "/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR" - - "--reserved-nodes" + - "--bootnodes" - "/dns4/validator-b/tcp/30333/p2p/QmSVnNf9HwVMT1Y4cK1P6aoJcEZjmoTXpjKBmAABLMnZEk" - "--no-telemetry" - "--rpc-cors" @@ -97,7 +96,6 @@ services: # Make sure sentry-a still participates as a grandpa voter to forward # grandpa finality gossip messages. - "--grandpa-voter" - - "--no-mdns" validator-b: image: parity/substrate @@ -121,7 +119,9 @@ services: - "--validator" - "--name" - "BobsNode" - - "--reserved-nodes" + - "--bootnodes" + - "/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR" + - "--bootnodes" - "/dns4/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi" - "--no-telemetry" - "--rpc-cors" @@ -129,7 +129,6 @@ services: # Not only bind to localhost. - "--ws-external" - "--rpc-external" - - "--no-mdns" ui: image: polkadot-js/apps