Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 62bd312

Browse files
authored
Merge pull request paritytech#442 from subspace/farmer-bench-command
Initial benchmarking support for the farmer
2 parents 12ffeba + 415032f commit 62bd312

File tree

8 files changed

+433
-79
lines changed

8 files changed

+433
-79
lines changed

crates/subspace-farmer/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ subspace-solving = { version = "0.1.0", path = "../subspace-solving" }
4242
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
4343
subspace-networking = { version = "0.1.0", path = "../subspace-networking" }
4444
subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" }
45+
tempfile = "3.3.0"
4546
thiserror = "1.0.30"
4647
tokio = { version = "1.17.0", features = ["macros", "parking_lot", "rt-multi-thread"] }
4748
zeroize = "1.5.4"
@@ -58,9 +59,6 @@ default-features = false
5859
features = ["snappy", "jemalloc"]
5960
version = "0.18.0"
6061

61-
[dev-dependencies]
62-
tempfile = "3.3.0"
63-
6462
[features]
6563
default = []
6664
# Compile with CUDA support and use it if compatible GPU is available
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use std::sync::Arc;
2+
3+
use async_trait::async_trait;
4+
use subspace_archiving::archiver::ArchivedSegment;
5+
use subspace_core_primitives::BlockNumber;
6+
use subspace_rpc_primitives::{
7+
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
8+
};
9+
use tokio::sync::mpsc::Receiver;
10+
use tokio::sync::{mpsc, Mutex};
11+
use tokio::task::JoinHandle;
12+
13+
use subspace_farmer::{RpcClient, RpcClientError as MockError};
14+
15+
/// Client mock for benching purpose
16+
#[derive(Clone, Debug)]
17+
pub struct BenchRpcClient {
18+
inner: Arc<Inner>,
19+
}
20+
21+
#[derive(Debug)]
22+
pub struct Inner {
23+
metadata: FarmerMetadata,
24+
acknowledge_archived_segment_sender: mpsc::Sender<u64>,
25+
archived_segments_receiver: Arc<Mutex<mpsc::Receiver<ArchivedSegment>>>,
26+
segment_producer_handle: Mutex<JoinHandle<()>>,
27+
}
28+
29+
impl BenchRpcClient {
30+
/// Create a new instance of [`BenchRpcClient`].
31+
pub fn new(
32+
metadata: FarmerMetadata,
33+
mut archived_segments_receiver: mpsc::Receiver<ArchivedSegment>,
34+
) -> Self {
35+
let (inner_archived_segments_sender, inner_archived_segments_receiver) = mpsc::channel(10);
36+
let (acknowledge_archived_segment_sender, mut acknowledge_archived_segment_receiver) =
37+
mpsc::channel(1);
38+
39+
let segment_producer_handle = tokio::spawn({
40+
async move {
41+
while let Some(segment) = archived_segments_receiver.recv().await {
42+
if inner_archived_segments_sender.send(segment).await.is_err() {
43+
break;
44+
}
45+
if acknowledge_archived_segment_receiver.recv().await.is_none() {
46+
break;
47+
}
48+
}
49+
}
50+
});
51+
52+
Self {
53+
inner: Arc::new(Inner {
54+
metadata,
55+
archived_segments_receiver: Arc::new(Mutex::new(inner_archived_segments_receiver)),
56+
acknowledge_archived_segment_sender,
57+
segment_producer_handle: Mutex::new(segment_producer_handle),
58+
}),
59+
}
60+
}
61+
62+
pub async fn stop(self) {
63+
self.inner.segment_producer_handle.lock().await.abort();
64+
}
65+
}
66+
67+
#[async_trait]
68+
impl RpcClient for BenchRpcClient {
69+
async fn farmer_metadata(&self) -> Result<FarmerMetadata, MockError> {
70+
Ok(self.inner.metadata.clone())
71+
}
72+
73+
async fn best_block_number(&self) -> Result<BlockNumber, MockError> {
74+
// Doesn't matter for tests (at least yet)
75+
Ok(BlockNumber::MAX)
76+
}
77+
78+
async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
79+
unreachable!("Unreachable, as we don't start farming for benchmarking")
80+
}
81+
82+
async fn submit_solution_response(
83+
&self,
84+
_solution_response: SolutionResponse,
85+
) -> Result<(), MockError> {
86+
unreachable!("Unreachable, as we don't start farming for benchmarking")
87+
}
88+
89+
async fn subscribe_block_signing(&self) -> Result<Receiver<BlockSigningInfo>, MockError> {
90+
unreachable!("Unreachable, as we don't start farming for benchmarking")
91+
}
92+
93+
async fn submit_block_signature(
94+
&self,
95+
_block_signature: BlockSignature,
96+
) -> Result<(), MockError> {
97+
unreachable!("Unreachable, as we don't start farming for benchmarking")
98+
}
99+
100+
async fn subscribe_archived_segments(&self) -> Result<Receiver<ArchivedSegment>, MockError> {
101+
let (sender, receiver) = mpsc::channel(10);
102+
let archived_segments_receiver = self.inner.archived_segments_receiver.clone();
103+
tokio::spawn(async move {
104+
while let Some(archived_segment) = archived_segments_receiver.lock().await.recv().await
105+
{
106+
if sender.send(archived_segment).await.is_err() {
107+
break;
108+
}
109+
}
110+
});
111+
112+
Ok(receiver)
113+
}
114+
115+
async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), MockError> {
116+
self.inner
117+
.acknowledge_archived_segment_sender
118+
.send(segment_index)
119+
.await?;
120+
Ok(())
121+
}
122+
}

crates/subspace-farmer/src/bin/subspace-farmer/commands.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod farm;
22

3-
pub(crate) use farm::farm;
3+
pub(crate) use farm::{bench, farm};
44
use log::info;
55
use std::path::Path;
66
use std::{fs, io};

0 commit comments

Comments
 (0)