Skip to content

Commit d0a1761

Browse files
committed
feat: support multiples torrents
1 parent 5af205c commit d0a1761

File tree

5 files changed

+96
-49
lines changed

5 files changed

+96
-49
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/bit_rev/src/session.rs

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::sync::Arc;
22

3+
use crate::file::{self, TorrentMeta};
4+
use crate::peer_state::PeerStates;
35
use crate::torrent::Torrent;
46
use crate::tracker_peers::TrackerPeers;
57
use crate::utils;
8+
use dashmap::DashMap;
69
use flume::Receiver;
710

811
#[derive(Debug, Clone, Copy)]
@@ -27,19 +30,68 @@ pub struct State {
2730
}
2831

2932
pub struct Session {
30-
pub tracker_stream: TrackerPeers,
33+
pub streams: DashMap<[u8; 20], TrackerPeers>,
34+
}
35+
36+
pub struct AddTorrentOptions {
37+
torrent_meta: TorrentMeta,
38+
}
39+
40+
impl AddTorrentOptions {
41+
fn from_meta(torrent_meta: TorrentMeta) -> Self {
42+
Self { torrent_meta }
43+
}
44+
45+
fn from_path(path: &str) -> Self {
46+
let torrent_meta = file::from_filename(path).unwrap();
47+
Self { torrent_meta }
48+
}
49+
}
50+
51+
impl From<TorrentMeta> for AddTorrentOptions {
52+
fn from(torrent_meta: TorrentMeta) -> Self {
53+
Self::from_meta(torrent_meta)
54+
}
55+
}
56+
57+
impl From<&str> for AddTorrentOptions {
58+
fn from(path: &str) -> Self {
59+
Self::from_path(path)
60+
}
61+
}
62+
63+
pub struct AddTorrentResult {
64+
pub torrent: Torrent,
65+
pub torrent_meta: TorrentMeta,
3166
pub pr_rx: Receiver<PieceResult>,
3267
}
3368

3469
impl Session {
35-
pub async fn download_torrent(
36-
torrent: Torrent,
37-
tracker_stream: TrackerPeers,
38-
have_broadcast: Arc<tokio::sync::broadcast::Sender<u32>>,
39-
) -> Self {
40-
let piece_rx = tracker_stream.piece_rx.clone();
70+
pub fn new() -> Self {
71+
Self {
72+
streams: DashMap::new(),
73+
}
74+
}
75+
76+
pub async fn add_torrent(
77+
&self,
78+
add_torrent: AddTorrentOptions,
79+
) -> anyhow::Result<AddTorrentResult> {
80+
let torrent = Torrent::new(&add_torrent.torrent_meta.clone());
81+
let torrent_meta = add_torrent.torrent_meta.clone();
4182
let (pr_tx, pr_rx) = flume::bounded::<PieceResult>(torrent.piece_hashes.len());
42-
//let (pr_tx, pr_rx) = flume::unbounded::<PieceResult>();
83+
let have_broadcast = Arc::new(tokio::sync::broadcast::channel(128).0);
84+
let peer_states = Arc::new(PeerStates::default());
85+
let random_peers = utils::generate_peer_id();
86+
87+
let tracker_stream = TrackerPeers::new(
88+
torrent_meta.clone(),
89+
15,
90+
random_peers,
91+
peer_states,
92+
have_broadcast.clone(),
93+
pr_rx.clone(),
94+
);
4395

4496
let pieces_of_work = (0..(torrent.piece_hashes.len()) as u64)
4597
.map(|index| {
@@ -55,6 +107,7 @@ impl Session {
55107
tracker_stream.connect(pieces_of_work).await;
56108

57109
let have_broadcast = have_broadcast.clone();
110+
let piece_rx = tracker_stream.piece_rx.clone();
58111

59112
tokio::spawn(async move {
60113
loop {
@@ -72,9 +125,19 @@ impl Session {
72125
}
73126
});
74127

75-
Self {
76-
tracker_stream,
128+
self.streams
129+
.insert(torrent.info_hash, tracker_stream.clone());
130+
131+
Ok(AddTorrentResult {
132+
torrent,
133+
torrent_meta,
77134
pr_rx,
78-
}
135+
})
136+
}
137+
}
138+
139+
impl Default for Session {
140+
fn default() -> Self {
141+
Self::new()
79142
}
80143
}

crates/bit_rev/src/tracker_peers.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
},
1212
peer_state::PeerStates,
1313
protocol_udp::request_udp_peers,
14-
session::PieceWork,
14+
session::{PieceResult, PieceWork},
1515
};
1616

1717
#[derive(Debug, Clone)]
@@ -21,6 +21,7 @@ pub struct TrackerPeers {
2121
pub peer_states: Arc<PeerStates>,
2222
pub piece_tx: flume::Sender<FullPiece>,
2323
pub piece_rx: flume::Receiver<FullPiece>,
24+
pub pr_rx: flume::Receiver<PieceResult>,
2425
pub have_broadcast: Arc<tokio::sync::broadcast::Sender<u32>>,
2526
}
2627

@@ -31,13 +32,15 @@ impl TrackerPeers {
3132
peer_id: [u8; 20],
3233
peer_states: Arc<PeerStates>,
3334
have_broadcast: Arc<tokio::sync::broadcast::Sender<u32>>,
35+
pr_rx: flume::Receiver<PieceResult>,
3436
) -> TrackerPeers {
3537
let (sender, receiver) = flume::unbounded();
3638
TrackerPeers {
3739
torrent_meta,
3840
peer_id,
3941
piece_tx: sender,
4042
piece_rx: receiver,
43+
pr_rx,
4144
peer_states,
4245
have_broadcast,
4346
}

crates/cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ default = []
1212
tokio-console = ["console-subscriber"]
1313

1414
[dependencies]
15+
anyhow.workspace = true
1516
tokio.workspace = true
1617
bit_rev.workspace = true
1718
indicatif.workspace = true

crates/cli/src/main.rs

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,7 @@ use tokio::{
1010
};
1111
use tracing::trace;
1212

13-
use bit_rev::{
14-
file::{self, TorrentMeta},
15-
session::Session,
16-
torrent::Torrent,
17-
tracker_peers::TrackerPeers,
18-
utils,
19-
};
13+
use bit_rev::{session::Session, utils};
2014

2115
#[tokio::main]
2216
async fn main() {
@@ -29,34 +23,17 @@ async fn main() {
2923
let filename = std::env::args().nth(1).expect("No torrent path given");
3024
let output = std::env::args().nth(2);
3125

32-
let torrent_meta = file::from_filename(&filename).unwrap();
33-
34-
download_file(torrent_meta, output).await
26+
if let Err(err) = download_file(&filename, output).await {
27+
eprintln!("Error: {:?}", err);
28+
}
3529
}
3630

37-
pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>) {
38-
let random_peers = utils::generate_peer_id();
31+
pub async fn download_file(filename: &str, out_file: Option<String>) -> anyhow::Result<()> {
32+
let session = Session::new();
3933

40-
let torrent = Torrent::new(&torrent_meta.clone());
41-
42-
let peer_states = Arc::new(bit_rev::peer_state::PeerStates::default());
43-
let (have_broadcast, _) = tokio::sync::broadcast::channel(128);
44-
let have_broadcast = Arc::new(have_broadcast);
45-
46-
//TODO: move it to a download manager state
47-
let tracker_stream = TrackerPeers::new(
48-
torrent_meta.clone(),
49-
15,
50-
random_peers,
51-
peer_states,
52-
have_broadcast.clone(),
53-
);
54-
55-
//TODO: I think this is really bad
56-
57-
//TODO: return more than just the buffer
58-
let downloader =
59-
Session::download_torrent(torrent.clone(), tracker_stream.clone(), have_broadcast).await;
34+
let add_torrent_result = session.add_torrent(filename.into()).await?;
35+
let torrent = add_torrent_result.torrent.clone();
36+
let torrent_meta = add_torrent_result.torrent_meta;
6037

6138
let total_size = torrent.length as u64;
6239
let pb = ProgressBar::new(total_size);
@@ -74,7 +51,7 @@ pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>)
7451
Some(name) => name,
7552
None => torrent_meta.clone().torrent_file.info.name.clone(),
7653
};
77-
let mut file = File::create(out_filename).await.unwrap();
54+
let mut file = File::create(out_filename).await?;
7855

7956
// File
8057
let total_downloaded = Arc::new(AtomicU64::new(0));
@@ -91,7 +68,7 @@ pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>)
9168

9269
let mut hashset = std::collections::HashSet::new();
9370
while hashset.len() < torrent.piece_hashes.len() {
94-
let pr = downloader.pr_rx.recv_async().await.unwrap();
71+
let pr = add_torrent_result.pr_rx.recv_async().await?;
9572

9673
hashset.insert(pr.index);
9774
let (start, end) = utils::calculate_bounds_for_piece(&torrent, pr.index as usize);
@@ -102,11 +79,13 @@ pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>)
10279
end,
10380
pr.length
10481
);
105-
file.seek(SeekFrom::Start(start as u64)).await.unwrap();
106-
file.write_all(pr.buf.as_slice()).await.unwrap();
82+
file.seek(SeekFrom::Start(start as u64)).await?;
83+
file.write_all(pr.buf.as_slice()).await?;
10784

10885
total_downloaded.fetch_add(pr.length as u64, std::sync::atomic::Ordering::Relaxed);
10986
}
11087

111-
file.sync_all().await.unwrap()
88+
file.sync_all().await?;
89+
90+
Ok(())
11291
}

0 commit comments

Comments
 (0)