Skip to content
This repository was archived by the owner on Aug 23, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions src/nack/responder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
};
use responder_stream::ResponderStream;

use crate::error::{Error, Result};
use crate::error::Result;
use crate::nack::stream_support_nack;

use async_trait::async_trait;
Expand Down Expand Up @@ -44,7 +44,6 @@ impl InterceptorBuilder for ResponderBuilder {
13 // 8192 = 1 << 13
},
streams: Arc::new(Mutex::new(HashMap::new())),
parent_rtcp_reader: Mutex::new(None),
}),
}))
}
Expand All @@ -53,7 +52,6 @@ impl InterceptorBuilder for ResponderBuilder {
pub struct ResponderInternal {
log2_size: u8,
streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
}

impl ResponderInternal {
Expand Down Expand Up @@ -92,27 +90,22 @@ impl ResponderInternal {
}
}

pub struct ResponderRtcpReader {
parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
internal: Arc<ResponderInternal>,
}

#[async_trait]
impl RTCPReader for ResponderInternal {
impl RTCPReader for ResponderRtcpReader {
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
let (n, attr) = {
let parent_rtcp_reader = {
let parent_rtcp_reader = self.parent_rtcp_reader.lock().await;
parent_rtcp_reader.clone()
};
if let Some(reader) = parent_rtcp_reader {
reader.read(buf, a).await?
} else {
return Err(Error::ErrInvalidParentRtcpReader);
}
};
let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };

let mut b = &buf[..n];
let pkts = rtcp::packet::unmarshal(&mut b)?;
for p in &pkts {
if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
let nack = nack.clone();
let streams = Arc::clone(&self.streams);
let streams = Arc::clone(&self.internal.streams);
tokio::spawn(async move {
ResponderInternal::resend_packets(streams, nack).await;
});
Expand Down Expand Up @@ -143,12 +136,10 @@ impl Interceptor for Responder {
&self,
reader: Arc<dyn RTCPReader + Send + Sync>,
) -> Arc<dyn RTCPReader + Send + Sync> {
{
let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
*parent_rtcp_reader = Some(reader);
}

Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
Arc::new(ResponderRtcpReader {
internal: Arc::clone(&self.internal),
parent_rtcp_reader: reader,
}) as Arc<dyn RTCPReader + Send + Sync>
}

/// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
Expand Down
1 change: 0 additions & 1 deletion src/report/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl ReportBuilder {
Duration::from_secs(1)
},
now: self.now.clone(),
parent_rtcp_reader: Mutex::new(None),
streams: Mutex::new(HashMap::new()),
close_rx: Mutex::new(Some(close_rx)),
}),
Expand Down
34 changes: 13 additions & 21 deletions src/report/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,24 @@ use waitgroup::WaitGroup;
pub(crate) struct ReceiverReportInternal {
pub(crate) interval: Duration,
pub(crate) now: Option<FnTimeGen>,
pub(crate) parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
pub(crate) streams: Mutex<HashMap<u32, Arc<ReceiverStream>>>,
pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
}

pub(crate) struct ReceiverReportRtcpReader {
pub(crate) internal: Arc<ReceiverReportInternal>,
pub(crate) parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
}

#[async_trait]
impl RTCPReader for ReceiverReportInternal {
impl RTCPReader for ReceiverReportRtcpReader {
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
let (n, attr) = {
let parent_rtcp_reader = {
let parent_rtcp_reader = self.parent_rtcp_reader.lock().await;
parent_rtcp_reader.clone()
};
if let Some(reader) = parent_rtcp_reader {
reader.read(buf, a).await?
} else {
return Err(Error::ErrInvalidParentRtcpReader);
}
};
let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };

let mut b = &buf[..n];
let pkts = rtcp::packet::unmarshal(&mut b)?;

let now = if let Some(f) = &self.now {
let now = if let Some(f) = &self.internal.now {
f().await
} else {
SystemTime::now()
Expand All @@ -50,7 +44,7 @@ impl RTCPReader for ReceiverReportInternal {
.downcast_ref::<rtcp::sender_report::SenderReport>()
{
let stream = {
let m = self.streams.lock().await;
let m = self.internal.streams.lock().await;
m.get(&sr.ssrc).cloned()
};
if let Some(stream) = stream {
Expand Down Expand Up @@ -136,12 +130,10 @@ impl Interceptor for ReceiverReport {
&self,
reader: Arc<dyn RTCPReader + Send + Sync>,
) -> Arc<dyn RTCPReader + Send + Sync> {
{
let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
*parent_rtcp_reader = Some(reader);
}

Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
Arc::new(ReceiverReportRtcpReader {
internal: Arc::clone(&self.internal),
parent_rtcp_reader: reader,
})
}

/// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
Expand Down