Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 74 additions & 11 deletions crates/bit_rev/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::sync::Arc;

use crate::file::{self, TorrentMeta};
use crate::peer_state::PeerStates;
use crate::torrent::Torrent;
use crate::tracker_peers::TrackerPeers;
use crate::utils;
use dashmap::DashMap;
use flume::Receiver;

#[derive(Debug, Clone, Copy)]
Expand All @@ -27,19 +30,68 @@ pub struct State {
}

pub struct Session {
pub tracker_stream: TrackerPeers,
pub streams: DashMap<[u8; 20], TrackerPeers>,
}

pub struct AddTorrentOptions {
torrent_meta: TorrentMeta,
}

impl AddTorrentOptions {
fn from_meta(torrent_meta: TorrentMeta) -> Self {
Self { torrent_meta }
}

fn from_path(path: &str) -> Self {
let torrent_meta = file::from_filename(path).unwrap();
Self { torrent_meta }
}
}

impl From<TorrentMeta> for AddTorrentOptions {
fn from(torrent_meta: TorrentMeta) -> Self {
Self::from_meta(torrent_meta)
}
}

impl From<&str> for AddTorrentOptions {
fn from(path: &str) -> Self {
Self::from_path(path)
}
}

pub struct AddTorrentResult {
pub torrent: Torrent,
pub torrent_meta: TorrentMeta,
pub pr_rx: Receiver<PieceResult>,
}

impl Session {
pub async fn download_torrent(
torrent: Torrent,
tracker_stream: TrackerPeers,
have_broadcast: Arc<tokio::sync::broadcast::Sender<u32>>,
) -> Self {
let piece_rx = tracker_stream.piece_rx.clone();
pub fn new() -> Self {
Self {
streams: DashMap::new(),
}
}

pub async fn add_torrent(
&self,
add_torrent: AddTorrentOptions,
) -> anyhow::Result<AddTorrentResult> {
let torrent = Torrent::new(&add_torrent.torrent_meta.clone());
let torrent_meta = add_torrent.torrent_meta.clone();
let (pr_tx, pr_rx) = flume::bounded::<PieceResult>(torrent.piece_hashes.len());
//let (pr_tx, pr_rx) = flume::unbounded::<PieceResult>();
let have_broadcast = Arc::new(tokio::sync::broadcast::channel(128).0);
let peer_states = Arc::new(PeerStates::default());
let random_peers = utils::generate_peer_id();

let tracker_stream = TrackerPeers::new(
torrent_meta.clone(),
15,
random_peers,
peer_states,
have_broadcast.clone(),
pr_rx.clone(),
);

let pieces_of_work = (0..(torrent.piece_hashes.len()) as u64)
.map(|index| {
Expand All @@ -55,6 +107,7 @@ impl Session {
tracker_stream.connect(pieces_of_work).await;

let have_broadcast = have_broadcast.clone();
let piece_rx = tracker_stream.piece_rx.clone();

tokio::spawn(async move {
loop {
Expand All @@ -72,9 +125,19 @@ impl Session {
}
});

Self {
tracker_stream,
self.streams
.insert(torrent.info_hash, tracker_stream.clone());

Ok(AddTorrentResult {
torrent,
torrent_meta,
pr_rx,
}
})
}
}

impl Default for Session {
fn default() -> Self {
Self::new()
}
}
5 changes: 4 additions & 1 deletion crates/bit_rev/src/tracker_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
},
peer_state::PeerStates,
protocol_udp::request_udp_peers,
session::PieceWork,
session::{PieceResult, PieceWork},
};

#[derive(Debug, Clone)]
Expand All @@ -21,6 +21,7 @@ pub struct TrackerPeers {
pub peer_states: Arc<PeerStates>,
pub piece_tx: flume::Sender<FullPiece>,
pub piece_rx: flume::Receiver<FullPiece>,
pub pr_rx: flume::Receiver<PieceResult>,
pub have_broadcast: Arc<tokio::sync::broadcast::Sender<u32>>,
}

Expand All @@ -31,13 +32,15 @@ impl TrackerPeers {
peer_id: [u8; 20],
peer_states: Arc<PeerStates>,
have_broadcast: Arc<tokio::sync::broadcast::Sender<u32>>,
pr_rx: flume::Receiver<PieceResult>,
) -> TrackerPeers {
let (sender, receiver) = flume::unbounded();
TrackerPeers {
torrent_meta,
peer_id,
piece_tx: sender,
piece_rx: receiver,
pr_rx,
peer_states,
have_broadcast,
}
Expand Down
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ default = []
tokio-console = ["console-subscriber"]

[dependencies]
anyhow.workspace = true
tokio.workspace = true
bit_rev.workspace = true
indicatif.workspace = true
Expand Down
53 changes: 16 additions & 37 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ use tokio::{
};
use tracing::trace;

use bit_rev::{
file::{self, TorrentMeta},
session::Session,
torrent::Torrent,
tracker_peers::TrackerPeers,
utils,
};
use bit_rev::{session::Session, utils};

#[tokio::main]
async fn main() {
Expand All @@ -29,34 +23,17 @@ async fn main() {
let filename = std::env::args().nth(1).expect("No torrent path given");
let output = std::env::args().nth(2);

let torrent_meta = file::from_filename(&filename).unwrap();

download_file(torrent_meta, output).await
if let Err(err) = download_file(&filename, output).await {
eprintln!("Error: {:?}", err);
}
}

pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>) {
let random_peers = utils::generate_peer_id();
pub async fn download_file(filename: &str, out_file: Option<String>) -> anyhow::Result<()> {
let session = Session::new();

let torrent = Torrent::new(&torrent_meta.clone());

let peer_states = Arc::new(bit_rev::peer_state::PeerStates::default());
let (have_broadcast, _) = tokio::sync::broadcast::channel(128);
let have_broadcast = Arc::new(have_broadcast);

//TODO: move it to a download manager state
let tracker_stream = TrackerPeers::new(
torrent_meta.clone(),
15,
random_peers,
peer_states,
have_broadcast.clone(),
);

//TODO: I think this is really bad

//TODO: return more than just the buffer
let downloader =
Session::download_torrent(torrent.clone(), tracker_stream.clone(), have_broadcast).await;
let add_torrent_result = session.add_torrent(filename.into()).await?;
let torrent = add_torrent_result.torrent.clone();
let torrent_meta = add_torrent_result.torrent_meta;

let total_size = torrent.length as u64;
let pb = ProgressBar::new(total_size);
Expand All @@ -74,7 +51,7 @@ pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>)
Some(name) => name,
None => torrent_meta.clone().torrent_file.info.name.clone(),
};
let mut file = File::create(out_filename).await.unwrap();
let mut file = File::create(out_filename).await?;

// File
let total_downloaded = Arc::new(AtomicU64::new(0));
Expand All @@ -91,7 +68,7 @@ pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>)

let mut hashset = std::collections::HashSet::new();
while hashset.len() < torrent.piece_hashes.len() {
let pr = downloader.pr_rx.recv_async().await.unwrap();
let pr = add_torrent_result.pr_rx.recv_async().await?;

hashset.insert(pr.index);
let (start, end) = utils::calculate_bounds_for_piece(&torrent, pr.index as usize);
Expand All @@ -102,11 +79,13 @@ pub async fn download_file(torrent_meta: TorrentMeta, out_file: Option<String>)
end,
pr.length
);
file.seek(SeekFrom::Start(start as u64)).await.unwrap();
file.write_all(pr.buf.as_slice()).await.unwrap();
file.seek(SeekFrom::Start(start as u64)).await?;
file.write_all(pr.buf.as_slice()).await?;

total_downloaded.fetch_add(pr.length as u64, std::sync::atomic::Ordering::Relaxed);
}

file.sync_all().await.unwrap()
file.sync_all().await?;

Ok(())
}
Loading