diff --git a/Cargo.lock b/Cargo.lock index 4b44d707371fd..bc991f760dcaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4747,6 +4747,7 @@ dependencies = [ "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "node-executor 2.0.0", "node-primitives 2.0.0", @@ -4761,7 +4762,9 @@ dependencies = [ "sr-primitives 2.0.0", "substrate-client 2.0.0", "substrate-client-db 2.0.0", + "substrate-consensus-aura-primitives 2.0.0", "substrate-consensus-common 2.0.0", + "substrate-consensus-common-primitives 2.0.0", "substrate-executor 2.0.0", "substrate-finality-grandpa 2.0.0", "substrate-keystore 2.0.0", diff --git a/core/client/src/runtime_api.rs b/core/client/src/runtime_api.rs index 890e265878dba..e87cd51863475 100644 --- a/core/client/src/runtime_api.rs +++ b/core/client/src/runtime_api.rs @@ -184,4 +184,3 @@ decl_runtime_apis! { fn validate_transaction(tx: ::Extrinsic) -> TransactionValidity; } } - diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index 28437901e2154..4776b671c6544 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -192,6 +192,7 @@ pub use service::{ NetworkStateInfo, }; pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::event::{Event, DhtEvent}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] diff --git a/core/network/src/protocol/event.rs b/core/network/src/protocol/event.rs index 2edbb0fbf7563..1023ced87b802 100644 --- a/core/network/src/protocol/event.rs +++ b/core/network/src/protocol/event.rs @@ -20,6 +20,7 @@ use libp2p::multihash::Multihash; /// Events generated by DHT as a response to get_value and put_value requests. +#[derive(Debug, Clone)] pub enum DhtEvent { /// The value was found. ValueFound(Vec<(Multihash, Vec)>), @@ -35,6 +36,7 @@ pub enum DhtEvent { } /// Type for events generated by networking layer. +#[derive(Debug, Clone)] pub enum Event { /// Event generated by a DHT. Dht(DhtEvent), diff --git a/core/network/src/service.rs b/core/network/src/service.rs index acd3bbeab7b10..bc6f67933dc06 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -589,11 +589,11 @@ pub struct NetworkWorker, H: Ex on_demand_in: Option>>, } -impl, H: ExHashT> Future for NetworkWorker { - type Item = (); +impl, H: ExHashT> Stream for NetworkWorker { + type Item = Event; type Error = io::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll, Self::Error> { // Poll the import queue for actions to perform. let _ = futures03::future::poll_fn(|cx| { self.import_queue.poll_actions(cx, &mut NetworkLink { @@ -613,7 +613,7 @@ impl, H: ExHashT> Future for Ne // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::NotReady), Ok(Async::NotReady) => break, }; @@ -654,8 +654,9 @@ impl, H: ExHashT> Future for Ne Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { self.network_service.user_protocol_mut() - .on_event(Event::Dht(ev)); - CustomMessageOutcome::None + .on_event(Event::Dht(ev.clone())); + + return Ok(Async::Ready(Some(Event::Dht(ev)))); }, Ok(Async::Ready(None)) => CustomMessageOutcome::None, Err(err) => { diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 4ce3facb52372..cb17c4b7e2a70 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -34,6 +34,9 @@ rpc = { package = "substrate-rpc-servers", path = "../../core/rpc-servers" } tel = { package = "substrate-telemetry", path = "../../core/telemetry" } offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } +libp2p = { version = "0.11.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } +consensus_common_primitives = { package = "substrate-consensus-common-primitives", path = "../../core/consensus/common/primitives", default-features = false } +consensus_aura_primitives = { package = "substrate-consensus-aura-primitives", path = "../../core/consensus/aura/primitives", default-features = false } [dev-dependencies] substrate-test-runtime-client = { path = "../test-runtime/client" } diff --git a/core/service/src/components.rs b/core/service/src/components.rs index dff6161f16523..ace3f0ec09bd4 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -19,9 +19,19 @@ use std::{sync::Arc, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; use crate::chain_spec::ChainSpec; +use std::time::{Duration, Instant}; +use log::{log, warn, Level}; +use libp2p::Multiaddr; +use parking_lot::Mutex; +use client::{BlockchainEvents}; +use futures03::stream::{StreamExt as _, TryStreamExt as _}; +use offchain::AuthorityKeyProvider as _; +use consensus_common_primitives::ConsensusApi; + +use network::{Event, DhtEvent}; use client_db; use client::{self, Client, runtime_api}; -use crate::{error, Service, AuthorityKeyProvider}; +use crate::{error, Service, AuthorityKeyProvider, NetworkStatus}; use consensus_common::{import_queue::ImportQueue, SelectChain}; use network::{self, OnDemand, FinalityProofProvider, NetworkStateInfo, config::BoxFinalityProofRequestBuilder}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; @@ -29,6 +39,8 @@ use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as use sr_primitives::{ BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::BlockId }; + +use network::NetworkState; use crate::config::Configuration; use primitives::{Blake2Hasher, H256, Pair}; use rpc::{self, apis::system::SystemInfo}; @@ -268,12 +280,257 @@ impl OffchainWorker for C where } } +pub trait NetworkFutureBuilder { + fn build_network_future( + network: network::NetworkWorker, S, H >, + client: Arc>, + status_sinks: Arc>, NetworkState)>>>>, + rpc_rx: mpsc::UnboundedReceiver>>, + should_have_peers: bool, + authority_key_provider: ComponentAuthorityKeyProvider, + )-> Box+ Send> + where + H: network::ExHashT, + S:network::specialization::NetworkSpecialization>; +} + +impl NetworkFutureBuilder for C +where + ComponentClient: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: ConsensusApi, ::AuthorityId>, + <<::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString, +{ + fn build_network_future( + mut network: network::NetworkWorker, S, H>, + client: Arc>, + status_sinks: Arc>, NetworkState)>>>>, + mut rpc_rx: mpsc::UnboundedReceiver>>, + should_have_peers: bool, + authority_key_provider: ComponentAuthorityKeyProvider, + )-> Box + Send> + where + H: network::ExHashT, + S:network::specialization::NetworkSpecialization>, + { + // Interval at which we send status updates on the status stream. + const STATUS_INTERVAL: Duration = Duration::from_millis(5000); + let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); + + let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5)); + + let mut imported_blocks_stream = client.import_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); + let mut finality_notification_stream = client.finality_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); + + Box::new(futures::future::poll_fn(move || { + let before_polling = Instant::now(); + + // We poll `imported_blocks_stream`. + while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { + network.on_block_imported(notification.hash, notification.header); + } + + while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() { + println!("==== We are connected to {} nodes", network.service().num_connected()); + let id = BlockId::hash( client.info().chain.best_hash); + + // Put our addresses on the DHT if we are a validator. + if let Some(authority_key) = authority_key_provider.authority_key( &id) { + let public_key = authority_key.public().to_string(); + + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + &public_key.as_bytes(), + ).expect("public key hashing not to fail"); + + let addresses: Vec = network.service().external_addresses() + .iter() + .map(|a| { + let mut a = a.clone(); + a.push(libp2p::core::multiaddr::Protocol::P2p(network.service().peer_id().into())); + a + }) + .collect(); + println!("==== external addresses: {:?}", addresses); + + // TODO: Remove unwrap. + let signature = authority_key.sign( + &serde_json::to_string(&addresses) + .map(|s| s.into_bytes()) + .expect("enriched_address marshaling not to fail") + ).as_ref().to_vec(); + + // TODO: Remove unwrap. + let payload = serde_json::to_string(&(addresses, signature)).expect("payload marshaling not to fail"); + + network.service().put_value(hashed_public_key, payload.into_bytes()); + } + + // Query addresses of other validators. + // TODO: Should non-validators also do this? Probably not a good default. + match client.runtime_api().authorities(&id) { + Ok(authorities) => { + for authority in authorities.iter() { + println!("==== querying dht for authority: {}", authority.to_string()); + // TODO: Remove unwrap. + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + authority.to_string().as_bytes(), + ).expect("public key hashing not to fail"); + + network.service().get_value(&hashed_public_key.clone()); + } + }, + Err(e) => { + println!("==== Got no authorities, but an error: {:?}", e); + } + } + } + + // We poll `finality_notification_stream`, but we only take the last event. + let mut last = None; + while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { + last = Some(item); + } + if let Some(notification) = last { + network.on_block_finalized(notification.hash, notification.header); + } + + // Poll the RPC requests and answer them. + while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { + match request { + rpc::apis::system::Request::Health(sender) => { + let _ = sender.send(rpc::apis::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.service().is_major_syncing(), + should_have_peers, + }); + }, + rpc::apis::system::Request::Peers(sender) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| + rpc::apis::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ).collect()); + } + rpc::apis::system::Request::NetworkState(sender) => { + let _ = sender.send(network.network_state()); + } + }; + } + + // Interval report for the external API. + while let Ok(Async::Ready(_)) = status_interval.poll() { + let status = NetworkStatus { + sync_state: network.sync_state(), + best_seen_block: network.best_seen_block(), + num_sync_peers: network.num_sync_peers(), + num_connected_peers: network.num_connected_peers(), + num_active_peers: network.num_active_peers(), + average_download_per_sec: network.average_download_per_sec(), + average_upload_per_sec: network.average_upload_per_sec(), + }; + let state = network.network_state(); + + status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); + } + + let authorities = client.runtime_api().authorities(&BlockId::hash(client.info().chain.best_hash)); + let valid_authority = |a: &libp2p::multihash::Multihash| { + match &authorities { + Ok(authorities) => { + for authority in authorities.iter() { + let hashed_public_key = libp2p::multihash::encode( + libp2p::multihash::Hash::SHA2256, + authority.to_string().as_bytes(), + ).expect("public key hashing not to fail"); + + // TODO: Comparing two pointers is safe, right? Given they are not fat-pointers. + if a == &hashed_public_key { + return Some(authority.clone()); + } + } + }, + // TODO: Should we handle the error here? + Err(_e) => {}, + } + + return None; + }; + + // TODO: Can we do this nicer? + let network_service = network.service().clone(); + let add_reserved_peer = |values: Vec<(libp2p::multihash::Multihash, Vec)>| { + for (key, value) in values.iter() { + // TODO: Should we log if it is not a valid one? + if let Some(authority_pub_key) = valid_authority(key) { + println!("===== adding other node"); + let value = std::str::from_utf8(value).expect("value to string not to fail"); + + let (addresses, signature): (Vec, Vec) = serde_json::from_str(value).expect("payload unmarshaling not to fail"); + + // TODO: is using verify-weak a problem here? + if <::Factory as ServiceFactory>::ConsensusPair::verify_weak( + &signature, + &serde_json::to_string(&addresses) + .map(|s| s.into_bytes()) + .expect("address marshaling not to fail"), + authority_pub_key, + ) { + for address in addresses.iter() { + // TODO: Why does add_reserved_peer take a string? + // TODO: Remove unwrap. + network_service.add_reserved_peer(address.to_string()).expect("adding reserved peer not to fail"); + } + } else { + // TODO: Log, don't print. + println!("==== signature not valid"); + } + } else { + println!("==== Did not find a match for the key"); + } + } + }; + + // Main network polling. + while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) { + match event { + DhtEvent::ValueFound(values) => add_reserved_peer(values), + DhtEvent::ValueNotFound(_h) => println!("==== Didn't find hash"), + DhtEvent::ValuePut(_h) => {}, + DhtEvent::ValuePutFailed(_h) => println!("==== failed to put value on DHT"), + } + }; + + // Now some diagnostic for performances. + let polling_dur = before_polling.elapsed(); + log!( + target: "service", + if polling_dur >= Duration::from_millis(50) { Level::Warn } else { Level::Trace }, + "Polling the network future took {:?}", + polling_dur + ); + + Ok(Async::NotReady) + })) + } + +} + /// The super trait that combines all required traits a `Service` needs to implement. pub trait ServiceTrait: Deref> + Send + 'static + StartRPC + + NetworkFutureBuilder + MaintainTransactionPool + OffchainWorker {} @@ -282,6 +539,7 @@ impl ServiceTrait for T where + Send + 'static + StartRPC + + NetworkFutureBuilder + MaintainTransactionPool + OffchainWorker {} @@ -321,6 +579,9 @@ pub trait ServiceFactory: 'static + Sized { type LightImportQueue: ImportQueue + 'static; /// The Fork Choice Strategy for the chain type SelectChain: SelectChain + 'static; + /// + // TODO: Are all of these trait bounds necessary? + type AuthorityId: primitives::crypto::Public + std::hash::Hash + parity_codec::Codec + std::string::ToString; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 9f5de4de1bad4..f72779d7f7a54 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -29,9 +29,10 @@ use std::io; use std::marker::PhantomData; use std::net::SocketAddr; use std::collections::HashMap; -use std::time::{Duration, Instant}; use futures::sync::mpsc; use parking_lot::Mutex; +use client::{self}; +use crate::components::NetworkFutureBuilder; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use exit_future::Signal; @@ -39,7 +40,7 @@ use futures::prelude::*; use futures03::stream::{StreamExt as _, TryStreamExt as _}; use keystore::Store as Keystore; use network::{NetworkState, NetworkStateInfo}; -use log::{log, info, warn, debug, error, Level}; +use log::{info, warn, debug, error}; use parity_codec::{Encode, Decode}; use primitives::{Pair, ed25519, sr25519, crypto}; use sr_primitives::generic::BlockId; @@ -209,6 +210,7 @@ impl Service { } let (client, on_demand) = Components::build_client(&config, executor)?; + let select_chain = Components::build_select_chain(&mut config, client.clone())?; let (import_queue, finality_proof_request_builder) = Components::build_import_queue( &mut config, @@ -426,12 +428,13 @@ impl Service { let rpc_handlers = gen_handler(); let rpc = start_rpc_servers::(&config, gen_handler)?; - let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future::( + let _ = to_spawn_tx.unbounded_send(Box::new(Components::RuntimeServices::build_network_future( network_mut, client.clone(), network_status_sinks.clone(), system_rpc_rx, - has_bootnodes + has_bootnodes, + keystore_authority_key.clone(), ) .map_err(|_| ()) .select(exit.clone()) @@ -624,109 +627,6 @@ impl Executor + Send>> } } -/// Builds a never-ending future that continuously polls the network. -/// -/// The `status_sink` contain a list of senders to send a periodic network status to. -fn build_network_future< - Components: components::Components, - S: network::specialization::NetworkSpecialization>, - H: network::ExHashT -> ( - mut network: network::NetworkWorker, S, H>, - client: Arc>, - status_sinks: Arc>, NetworkState)>>>>, - mut rpc_rx: mpsc::UnboundedReceiver>>, - should_have_peers: bool, -) -> impl Future { - // Interval at which we send status updates on the status stream. - const STATUS_INTERVAL: Duration = Duration::from_millis(5000); - let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); - - let mut imported_blocks_stream = client.import_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); - let mut finality_notification_stream = client.finality_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); - - futures::future::poll_fn(move || { - let before_polling = Instant::now(); - - // We poll `imported_blocks_stream`. - while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { - network.on_block_imported(notification.hash, notification.header); - } - - // We poll `finality_notification_stream`, but we only take the last event. - let mut last = None; - while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { - last = Some(item); - } - if let Some(notification) = last { - network.on_block_finalized(notification.hash, notification.header); - } - - // Poll the RPC requests and answer them. - while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { - match request { - rpc::apis::system::Request::Health(sender) => { - let _ = sender.send(rpc::apis::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.service().is_major_syncing(), - should_have_peers, - }); - }, - rpc::apis::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - rpc::apis::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - ).collect()); - } - rpc::apis::system::Request::NetworkState(sender) => { - let _ = sender.send(network.network_state()); - } - }; - } - - // Interval report for the external API. - while let Ok(Async::Ready(_)) = status_interval.poll() { - let status = NetworkStatus { - sync_state: network.sync_state(), - best_seen_block: network.best_seen_block(), - num_sync_peers: network.num_sync_peers(), - num_connected_peers: network.num_connected_peers(), - num_active_peers: network.num_active_peers(), - average_download_per_sec: network.average_download_per_sec(), - average_upload_per_sec: network.average_upload_per_sec(), - }; - let state = network.network_state(); - - status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok()); - } - - // Main network polling. - match network.poll() { - Ok(Async::NotReady) => {} - Err(err) => warn!(target: "service", "Error in network: {:?}", err), - Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"), - } - - // Now some diagnostic for performances. - let polling_dur = before_polling.elapsed(); - log!( - target: "service", - if polling_dur >= Duration::from_millis(50) { Level::Warn } else { Level::Trace }, - "Polling the network future took {:?}", - polling_dur - ); - - Ok(Async::NotReady) - }) -} - /// Overview status of the network. #[derive(Clone)] pub struct NetworkStatus { @@ -1082,6 +982,8 @@ macro_rules! construct_service_factory { SelectChain = $select_chain:ty { $( $select_chain_init:tt )* }, FinalityProofProvider = { $( $finality_proof_provider_init:tt )* }, + AuthorityId = $authority_id:ty, + } ) => { $( #[$attr] )* @@ -1104,6 +1006,7 @@ macro_rules! construct_service_factory { type FullImportQueue = $full_import_queue; type LightImportQueue = $light_import_queue; type SelectChain = $select_chain; + type AuthorityId = $authority_id; fn build_full_transaction_pool( config: $crate::TransactionPoolOptions, diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 6c45f45d008cb..c35ad4f6516f2 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -22,14 +22,13 @@ use std::sync::Arc; use std::time::Duration; use babe::{import_queue, start_babe, BabeImportQueue, Config}; -use babe_primitives::AuthorityPair as BabePair; +use babe_primitives::{AuthorityPair as BabePair, AuthorityId}; use client::{self, LongestChain}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use node_executor; use primitives::Pair; use grandpa_primitives::AuthorityPair as GrandpaPair; use futures::prelude::*; -use node_primitives::Block; use node_runtime::{GenesisConfig, RuntimeApi}; use substrate_service::{ FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, @@ -42,6 +41,7 @@ use network::construct_simple_protocol; use substrate_service::construct_service_factory; use log::info; use substrate_service::TelemetryOnConnect; +use node_primitives::{Block}; construct_simple_protocol! { /// Demo protocol attachment for substrate. @@ -253,6 +253,7 @@ construct_service_factory! { FinalityProofProvider = { |client: Arc>| { Ok(Some(Arc::new(GrandpaFinalityProofProvider::new(client.clone(), client)) as _)) }}, + AuthorityId = AuthorityId, } }