Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 162486c

Browse files
montekkirphmeier
andauthored
Overseer (#1152)
* Initial commit * Licenses, spaces, docs * Add a spawner * Watch spawned subsystems with a FuturesUnordered * Move the types around a bit * Suggested fixes by Max * Add a handler to talk to the Overseer * FromOverseer and ToOverseer msgs and stopping * Docs and return errors * Dont broadcast, have add a from field to messages * Allow communication between subsystems and outside world * A message with a oneshot to send result example * Remove leftover can_recv_msg * Remove from field from messages * Dont be generic over stuff * Gather messages with StreamUnordered * Fix comments and formatting * More docs fixes and an example * Apply suggestions from code review Co-authored-by: Robert Habermeier <[email protected]> * Fixes from review Move function from impl block. Do not panic but resolve with errors if spawner fails or subsystem resolves. * Dropping a handler results in a flaky test Co-authored-by: Robert Habermeier <[email protected]>
1 parent 0ceb54e commit 162486c

File tree

5 files changed

+962
-0
lines changed

5 files changed

+962
-0
lines changed

Cargo.lock

Lines changed: 41 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ members = [
2727
"erasure-coding",
2828
"network",
2929
"network/test",
30+
"overseer",
3031
"primitives",
3132
"runtime/common",
3233
"runtime/polkadot",

overseer/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "overseer"
3+
version = "0.1.0"
4+
authors = ["Parity Technologies <[email protected]>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
futures = "0.3.5"
9+
log = "0.4.8"
10+
futures-timer = "3.0.2"
11+
streamunordered = "0.5.1"
12+
13+
[dev-dependencies]
14+
futures = { version = "0.3.5", features = ["thread-pool"] }
15+
futures-timer = "3.0.2"
16+
femme = "2.0.1"
17+
log = "0.4.8"
18+
kv-log-macro = "1.0.6"
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2020 Parity Technologies (UK) Ltd.
2+
// This file is part of Polkadot.
3+
4+
// Polkadot is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
9+
// Polkadot is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
14+
// You should have received a copy of the GNU General Public License
15+
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
16+
17+
//! Shows a basic usage of the `Overseer`:
18+
//! * Spawning subsystems and subsystem child jobs
19+
//! * Establishing message passing
20+
21+
use std::time::Duration;
22+
use futures::{
23+
pending, pin_mut, executor, select, stream,
24+
FutureExt, StreamExt,
25+
};
26+
use futures_timer::Delay;
27+
use kv_log_macro as log;
28+
29+
use overseer::{
30+
AllMessages, CandidateBackingSubsystemMessage, FromOverseer,
31+
Overseer, Subsystem, SubsystemContext, SpawnedSubsystem, ValidationSubsystemMessage,
32+
};
33+
34+
struct Subsystem1;
35+
36+
impl Subsystem1 {
37+
async fn run(mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) {
38+
loop {
39+
match ctx.try_recv().await {
40+
Ok(Some(msg)) => {
41+
if let FromOverseer::Communication { msg } = msg {
42+
log::info!("msg {:?}", msg);
43+
}
44+
continue;
45+
}
46+
Ok(None) => (),
47+
Err(_) => {
48+
log::info!("exiting");
49+
return;
50+
}
51+
}
52+
53+
Delay::new(Duration::from_secs(1)).await;
54+
ctx.send_msg(AllMessages::Validation(
55+
ValidationSubsystemMessage::ValidityAttestation
56+
)).await.unwrap();
57+
}
58+
}
59+
}
60+
61+
impl Subsystem<CandidateBackingSubsystemMessage> for Subsystem1 {
62+
fn start(&mut self, ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
63+
SpawnedSubsystem(Box::pin(async move {
64+
Self::run(ctx).await;
65+
}))
66+
}
67+
}
68+
69+
struct Subsystem2;
70+
71+
impl Subsystem2 {
72+
async fn run(mut ctx: SubsystemContext<ValidationSubsystemMessage>) {
73+
ctx.spawn(Box::pin(async {
74+
loop {
75+
log::info!("Job tick");
76+
Delay::new(Duration::from_secs(1)).await;
77+
}
78+
})).await.unwrap();
79+
80+
loop {
81+
match ctx.try_recv().await {
82+
Ok(Some(msg)) => {
83+
log::info!("Subsystem2 received message {:?}", msg);
84+
continue;
85+
}
86+
Ok(None) => { pending!(); }
87+
Err(_) => {
88+
log::info!("exiting");
89+
return;
90+
},
91+
}
92+
}
93+
}
94+
}
95+
96+
impl Subsystem<ValidationSubsystemMessage> for Subsystem2 {
97+
fn start(&mut self, ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
98+
SpawnedSubsystem(Box::pin(async move {
99+
Self::run(ctx).await;
100+
}))
101+
}
102+
}
103+
104+
fn main() {
105+
femme::with_level(femme::LevelFilter::Trace);
106+
let spawner = executor::ThreadPool::new().unwrap();
107+
108+
futures::executor::block_on(async {
109+
let timer_stream = stream::repeat(()).then(|_| async {
110+
Delay::new(Duration::from_secs(1)).await;
111+
});
112+
113+
let (overseer, _handler) = Overseer::new(
114+
Box::new(Subsystem2),
115+
Box::new(Subsystem1),
116+
spawner,
117+
).unwrap();
118+
let overseer_fut = overseer.run().fuse();
119+
let timer_stream = timer_stream;
120+
121+
pin_mut!(timer_stream);
122+
pin_mut!(overseer_fut);
123+
124+
loop {
125+
select! {
126+
_ = overseer_fut => break,
127+
_ = timer_stream.next() => {
128+
log::info!("tick");
129+
}
130+
complete => break,
131+
}
132+
}
133+
});
134+
}

0 commit comments

Comments
 (0)