Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions iroh/examples/locally-discovered-nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`MdnsDiscovery`] enabled, it may discover those nodes as well.
use std::time::Duration;

use iroh::{Endpoint, NodeId, node_info::UserData};
use iroh::{Endpoint, NodeId, discovery::DiscoveryEvent, node_info::UserData};
use n0_future::StreamExt;
use n0_snafu::Result;
use tokio::task::JoinSet;
Expand All @@ -32,7 +32,7 @@ async fn main() -> Result<()> {
tracing::error!("{e}");
return;
}
Ok(item) => {
Ok(DiscoveryEvent::Discovered(item)) => {
// if there is no user data, or the user data
// does not indicate that the discovered node
// is a part of the example, ignore it
Expand All @@ -53,6 +53,7 @@ async fn main() -> Result<()> {
println!("Found node {}!", item.node_id().fmt_short());
}
}
Ok(DiscoveryEvent::Expired(_)) => {}
};
}
});
Expand Down
65 changes: 43 additions & 22 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,23 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static {
/// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system
/// and add the discovered addresses to the internal address book as they arrive
/// on this stream.
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
None
}
}

impl<T: Discovery> Discovery for Arc<T> {}

/// An event emitted from [`Discovery`] services.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum DiscoveryEvent {
/// A peer was discovered or it's information was updated.
Discovered(DiscoveryItem),
/// A peer was expired due to being inactive, unreachable, or otherwise
/// unavailable.
Expired(NodeId),
}

/// Node discovery results from [`Discovery`] services.
///
/// This is the item in the streams returned from [`Discovery::resolve`] and
Expand All @@ -367,7 +377,7 @@ impl<T: Discovery> Discovery for Arc<T> {}
///
/// This struct derefs to [`NodeData`], so you can access the methods from [`NodeData`]
/// directly from [`DiscoveryItem`].
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DiscoveryItem {
/// The node info for the node, as discovered by the the discovery service.
node_info: NodeInfo,
Expand Down Expand Up @@ -498,7 +508,7 @@ impl Discovery for ConcurrentDiscovery {
Some(Box::pin(streams))
}

fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
let mut streams = vec![];
for service in self.services.iter() {
if let Some(stream) = service.subscribe() {
Expand Down Expand Up @@ -649,11 +659,13 @@ impl DiscoveryTask {
}
debug!(%provenance, addr = ?node_addr, "new address found");
ep.add_node_addr_with_source(node_addr, provenance).ok();

if let Some(tx) = on_first_tx.take() {
tx.send(Ok(())).ok();
}
// Send the discovery item to the subscribers of the discovery broadcast stream.
ep.discovery_subscribers().send(r);
ep.discovery_subscribers()
.send(DiscoveryEvent::Discovered(r));
}
Some(Err(err)) => {
warn!(?err, "discovery service produced error");
Expand Down Expand Up @@ -685,7 +697,7 @@ pub struct Lagged {

#[derive(Clone, Debug)]
pub(super) struct DiscoverySubscribers {
inner: tokio::sync::broadcast::Sender<DiscoveryItem>,
inner: tokio::sync::broadcast::Sender<DiscoveryEvent>,
}

impl DiscoverySubscribers {
Expand All @@ -699,13 +711,13 @@ impl DiscoverySubscribers {
}
}

pub(crate) fn subscribe(&self) -> impl Stream<Item = Result<DiscoveryItem, Lagged>> + use<> {
pub(crate) fn subscribe(&self) -> impl Stream<Item = Result<DiscoveryEvent, Lagged>> + use<> {
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
let recv = self.inner.subscribe();
BroadcastStream::new(recv).map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged { val: n })
}

pub(crate) fn send(&self, item: DiscoveryItem) {
pub(crate) fn send(&self, item: DiscoveryEvent) {
// `broadcast::Sender::send` returns an error if the channel has no subscribers,
// which we don't care about.
self.inner.send(item).ok();
Expand Down Expand Up @@ -737,7 +749,7 @@ mod tests {
#[derive(Debug, Clone)]
struct TestDiscoveryShared {
nodes: Arc<Mutex<InfoStore>>,
watchers: tokio::sync::broadcast::Sender<DiscoveryItem>,
watchers: tokio::sync::broadcast::Sender<DiscoveryEvent>,
}

impl Default for TestDiscoveryShared {
Expand Down Expand Up @@ -770,7 +782,7 @@ mod tests {
}
}

pub fn send_passive(&self, item: DiscoveryItem) {
pub fn send_passive(&self, item: DiscoveryEvent) {
self.watchers.send(item).ok();
}
}
Expand Down Expand Up @@ -831,7 +843,7 @@ mod tests {
Some(stream)
}

fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
let recv = self.shared.watchers.subscribe();
let stream =
tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|item| item.ok());
Expand Down Expand Up @@ -1016,25 +1028,34 @@ mod tests {
ep2.node_addr().initialized().await;
let _ = ep1.connect(ep2.node_id(), TEST_ALPN).await?;

let item = tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged");
let DiscoveryEvent::Discovered(item) =
tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged")
else {
panic!("Returned unexpected discovery event!");
};

assert_eq!(item.node_id(), ep2.node_id());
assert_eq!(item.provenance(), "test-disco");

// inject item into discovery passively
let passive_node_id = SecretKey::generate(rand::thread_rng()).public();
let node_info = NodeInfo::new(passive_node_id);
let passive_item = DiscoveryItem::new(node_info, "test-disco-passive", None);
disco_shared.send_passive(passive_item.clone());

let item = tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged");
disco_shared.send_passive(DiscoveryEvent::Discovered(passive_item.clone()));

let DiscoveryEvent::Discovered(item) =
tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged")
else {
panic!("Returned unexpected discovery event!");
};
assert_eq!(item.node_id(), passive_node_id);
assert_eq!(item.provenance(), "test-disco-passive");

Expand Down
112 changes: 92 additions & 20 deletions iroh/src/discovery/mdns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{Instrument, debug, error, info_span, trace, warn};

use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError};
use crate::discovery::{Discovery, DiscoveryItem, NodeData, NodeInfo};
use crate::discovery::{Discovery, DiscoveryEvent, DiscoveryItem, NodeData, NodeInfo};

/// The n0 local swarm node discovery name
const N0_LOCAL_SWARM: &str = "iroh.local.swarm";
Expand Down Expand Up @@ -83,27 +83,27 @@ enum Message {
Discovery(String, Peer),
Resolve(NodeId, mpsc::Sender<Result<DiscoveryItem, DiscoveryError>>),
Timeout(NodeId, usize),
Subscribe(mpsc::Sender<DiscoveryItem>),
Subscribe(mpsc::Sender<DiscoveryEvent>),
}

/// Manages the list of subscribers that are subscribed to this discovery service.
#[derive(Debug)]
struct Subscribers(Vec<mpsc::Sender<DiscoveryItem>>);
struct Subscribers(Vec<mpsc::Sender<DiscoveryEvent>>);

impl Subscribers {
fn new() -> Self {
Self(vec![])
}

/// Add the subscriber to the list of subscribers
fn push(&mut self, subscriber: mpsc::Sender<DiscoveryItem>) {
fn push(&mut self, subscriber: mpsc::Sender<DiscoveryEvent>) {
self.0.push(subscriber);
}

/// Sends the `node_id` and `item` to each subscriber.
///
/// Cleans up any subscribers that have been dropped.
fn send(&mut self, item: DiscoveryItem) {
fn send(&mut self, item: DiscoveryEvent) {
let mut clean_up = vec![];
for (i, subscriber) in self.0.iter().enumerate() {
// assume subscriber was dropped
Expand Down Expand Up @@ -234,6 +234,7 @@ impl MdnsDiscovery {
error!("MdnsDiscovery channel closed");
error!("closing MdnsDiscovery");
timeouts.abort_all();
discovery.remove_all();
return;
}
Some(msg) => msg,
Expand Down Expand Up @@ -266,6 +267,7 @@ impl MdnsDiscovery {
"removing node from MdnsDiscovery address book"
);
node_addrs.remove(&discovered_node_id);
subscribers.send(DiscoveryEvent::Expired(discovered_node_id));
continue;
}

Expand Down Expand Up @@ -298,7 +300,7 @@ impl MdnsDiscovery {
// in other words, nodes sent to the `subscribers` should only be the ones that
// have been "passively" discovered
if !resolved {
subscribers.send(item);
subscribers.send(DiscoveryEvent::Discovered(item));
}
}
Message::Resolve(node_id, sender) => {
Expand Down Expand Up @@ -346,8 +348,8 @@ impl MdnsDiscovery {
let handle = task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor")));
Ok(Self {
handle: AbortOnDropHandle::new(handle),
advertise,
sender: send,
advertise,
local_addrs,
})
}
Expand Down Expand Up @@ -450,7 +452,7 @@ impl Discovery for MdnsDiscovery {
}
}

fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
use futures_util::FutureExt;

let (sender, recv) = mpsc::channel(20);
Expand Down Expand Up @@ -490,29 +492,99 @@ mod tests {
let user_data: UserData = "foobar".parse()?;
let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()]))
.with_user_data(Some(user_data.clone()));
println!("info {node_data:?}");

// resolve twice to ensure we can create separate streams for the same node_id
let mut s1 = discovery_a.resolve(node_id_b).unwrap();
let mut s2 = discovery_a.resolve(node_id_b).unwrap();
let mut s1 = discovery_a
.subscribe()
.unwrap()
.filter(|event| match event {
DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b,
_ => false,
});
let mut s2 = discovery_a
.subscribe()
.unwrap()
.filter(|event| match event {
DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b,
_ => false,
});

tracing::debug!(?node_id_b, "Discovering node id b");
// publish discovery_b's address
discovery_b.publish(&node_data);
let s1_res = tokio::time::timeout(Duration::from_secs(5), s1.next())
.await
.context("timeout")?
.unwrap()?;
let s2_res = tokio::time::timeout(Duration::from_secs(5), s2.next())
.await
.context("timeout")?
.unwrap()?;
let DiscoveryEvent::Discovered(s1_res) =
tokio::time::timeout(Duration::from_secs(5), s1.next())
.await
.context("timeout")?
.unwrap()
else {
panic!("Received unexpected discovery event");
};
let DiscoveryEvent::Discovered(s2_res) =
tokio::time::timeout(Duration::from_secs(5), s2.next())
.await
.context("timeout")?
.unwrap()
else {
panic!("Received unexpected discovery event");
};
assert_eq!(s1_res.node_info().data, node_data);
assert_eq!(s2_res.node_info().data, node_data);

Ok(())
}

#[tokio::test]
#[traced_test]
async fn mdns_publish_expire() -> Result {
let (_, discovery_a) = make_discoverer(false)?;
let (node_id_b, discovery_b) = make_discoverer(true)?;

// publish discovery_b's address
let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()]))
.with_user_data(Some("".parse()?));
discovery_b.publish(&node_data);

let mut s1 = discovery_a.subscribe().unwrap();
tracing::debug!(?node_id_b, "Discovering node id b");

// Wait for the specific node to be discovered
loop {
let event = tokio::time::timeout(Duration::from_secs(5), s1.next())
.await
.context("timeout")?
.expect("Stream should not be closed");

match event {
DiscoveryEvent::Discovered(item) if item.node_info().node_id == node_id_b => {
break;
}
_ => continue, // Ignore other discovery events
}
}

// Shutdown node B
drop(discovery_b);
tokio::time::sleep(Duration::from_secs(5)).await;

// Wait for the expiration event for the specific node
loop {
let event = tokio::time::timeout(Duration::from_secs(10), s1.next())
.await
.context("timeout waiting for expiration event")?
.expect("Stream should not be closed");

match event {
DiscoveryEvent::Expired(expired_node_id) if expired_node_id == node_id_b => {
break;
}
_ => continue, // Ignore other events
}
}

Ok(())
}

#[tokio::test]
#[traced_test]
async fn mdns_subscribe() -> Result {
Expand All @@ -537,7 +609,7 @@ mod tests {
let test = async move {
let mut got_ids = BTreeSet::new();
while got_ids.len() != num_nodes {
if let Some(item) = events.next().await {
if let Some(DiscoveryEvent::Discovered(item)) = events.next().await {
if node_ids.contains(&(item.node_id(), item.user_data())) {
got_ids.insert((item.node_id(), item.user_data()));
}
Expand Down
Loading