Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 105 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,5 @@ members = [
]
resolver = "2"

[workspace.dependencies]
tokio-test = { version = "0.4.4" }

[profile.dev]
opt-level = 0
4 changes: 3 additions & 1 deletion ice/src/candidate/candidate_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ impl Candidate for CandidateBase {
{
let mut closed_ch = self.closed_ch.lock().await;
if closed_ch.is_none() {
return Err(Error::ErrClosed);
// Если кандидат уже был ранее закрыт, не возвращать ошибку, а просто вернуть успех
return Ok(());
// return Err(Error::ErrClosed);
}
closed_ch.take();
}
Expand Down
63 changes: 16 additions & 47 deletions interceptor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,11 @@ pub trait Interceptor {
#[async_trait]
pub trait RTPWriter {
/// write a rtp packet
async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize>;
async fn write(&self, pkt: &rtp::packet::Packet) -> Result<usize>;
}

pub type RTPWriterBoxFn = Box<
dyn (Fn(
&rtp::packet::Packet,
&Attributes,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
dyn (Fn(&rtp::packet::Packet) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
+ Send
+ Sync,
>;
Expand All @@ -95,28 +92,20 @@ pub struct RTPWriterFn(pub RTPWriterBoxFn);
#[async_trait]
impl RTPWriter for RTPWriterFn {
/// write a rtp packet
async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize> {
self.0(pkt, attributes).await
async fn write(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
self.0(pkt).await
}
}

/// RTPReader is used by Interceptor.bind_remote_stream.
#[async_trait]
pub trait RTPReader {
/// read a rtp packet
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)>;
async fn read(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet>;
}

pub type RTPReaderBoxFn = Box<
dyn (Fn(
&mut [u8],
&Attributes,
)
-> Pin<Box<dyn Future<Output = Result<(rtp::packet::Packet, Attributes)>> + Send + Sync>>)
dyn (Fn(&mut [u8]) -> Pin<Box<dyn Future<Output = Result<rtp::packet::Packet>> + Send + Sync>>)
+ Send
+ Sync,
>;
Expand All @@ -125,30 +114,21 @@ pub struct RTPReaderFn(pub RTPReaderBoxFn);
#[async_trait]
impl RTPReader for RTPReaderFn {
/// read a rtp packet
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)> {
self.0(buf, attributes).await
async fn read(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet> {
self.0(buf).await
}
}

/// RTCPWriter is used by Interceptor.bind_rtcpwriter.
#[async_trait]
pub trait RTCPWriter {
/// write a batch of rtcp packets
async fn write(
&self,
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
attributes: &Attributes,
) -> Result<usize>;
async fn write(&self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>]) -> Result<usize>;
}

pub type RTCPWriterBoxFn = Box<
dyn (Fn(
&[Box<dyn rtcp::packet::Packet + Send + Sync>],
&Attributes,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
+ Send
+ Sync,
Expand All @@ -159,12 +139,8 @@ pub struct RTCPWriterFn(pub RTCPWriterBoxFn);
#[async_trait]
impl RTCPWriter for RTCPWriterFn {
/// write a batch of rtcp packets
async fn write(
&self,
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
attributes: &Attributes,
) -> Result<usize> {
self.0(pkts, attributes).await
async fn write(&self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>]) -> Result<usize> {
self.0(pkts).await
}
}

Expand All @@ -175,22 +151,16 @@ pub trait RTCPReader {
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>;
) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>;
}

pub type RTCPReaderBoxFn = Box<
dyn (Fn(
&mut [u8],
&Attributes,
) -> Pin<
Box<
dyn Future<
Output = Result<(
Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>,
Attributes,
)>,
> + Send
dyn Future<Output = Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>>
+ Send
+ Sync,
>,
>) + Send
Expand All @@ -205,9 +175,8 @@ impl RTCPReader for RTCPReaderFn {
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
self.0(buf, attributes).await
) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
self.0(buf).await
}
}

Expand Down
39 changes: 13 additions & 26 deletions interceptor/src/mock/mock_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use util::Marshal;

use crate::error::{Error, Result};
use crate::stream_info::StreamInfo;
use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter};
use crate::{Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter};

type RTCPPackets = Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>;

Expand Down Expand Up @@ -91,10 +91,9 @@ impl MockStream {
.await;
tokio::spawn(async move {
let mut buf = vec![0u8; 1500];
let a = Attributes::new();
loop {
let pkts = match rtcp_reader.read(&mut buf, &a).await {
Ok((n, _)) => n,
let pkts = match rtcp_reader.read(&mut buf).await {
Ok(n) => n,
Err(err) => {
let _ = rtcp_in_modified_tx.send(Err(err)).await;
break;
Expand All @@ -113,10 +112,9 @@ impl MockStream {
.await;
tokio::spawn(async move {
let mut buf = vec![0u8; 1500];
let a = Attributes::new();
loop {
let pkt = match rtp_reader.read(&mut buf, &a).await {
Ok((pkt, _)) => pkt,
let pkt = match rtp_reader.read(&mut buf).await {
Ok(pkt) => pkt,
Err(err) => {
let _ = rtp_in_modified_tx.send(Err(err)).await;
break;
Expand All @@ -135,21 +133,19 @@ impl MockStream {
&self,
pkt: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
) -> Result<usize> {
let a = Attributes::new();
let rtcp_writer = self.rtcp_writer.lock().await;
if let Some(writer) = &*rtcp_writer {
writer.write(pkt, &a).await
writer.write(pkt).await
} else {
Err(Error::Other("invalid rtcp_writer".to_owned()))
}
}

/// write_rtp writes an rtp packet to the stream, using the interceptor
pub async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
let a = Attributes::new();
let rtp_writer = self.rtp_writer.lock().await;
if let Some(writer) = &*rtp_writer {
writer.write(pkt, &a).await
writer.write(pkt).await
} else {
Err(Error::Other("invalid rtp_writer".to_owned()))
}
Expand Down Expand Up @@ -229,11 +225,7 @@ impl MockStream {

#[async_trait]
impl RTCPWriter for MockStreamInternal {
async fn write(
&self,
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
_attributes: &Attributes,
) -> Result<usize> {
async fn write(&self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>]) -> Result<usize> {
let _ = self.rtcp_out_modified_tx.send(pkts.to_vec()).await;

Ok(0)
Expand All @@ -245,8 +237,7 @@ impl RTCPReader for MockStreamInternal {
async fn read(
&self,
buf: &mut [u8],
a: &Attributes,
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
let pkts = {
let mut rtcp_in = self.rtcp_in_rx.lock().await;
rtcp_in.recv().await.ok_or(Error::ErrIoEOF)?
Expand All @@ -259,25 +250,21 @@ impl RTCPReader for MockStreamInternal {
}

buf[..n].copy_from_slice(&marshaled);
Ok((pkts, a.clone()))
Ok(pkts)
}
}

#[async_trait]
impl RTPWriter for MockStreamInternal {
async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result<usize> {
async fn write(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
let _ = self.rtp_out_modified_tx.send(pkt.clone()).await;
Ok(0)
}
}

#[async_trait]
impl RTPReader for MockStreamInternal {
async fn read(
&self,
buf: &mut [u8],
a: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)> {
async fn read(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet> {
let pkt = {
let mut rtp_in = self.rtp_in_rx.lock().await;
rtp_in.recv().await.ok_or(Error::ErrIoEOF)?
Expand All @@ -290,7 +277,7 @@ impl RTPReader for MockStreamInternal {
}

buf[..n].copy_from_slice(&marshaled);
Ok((pkt, a.clone()))
Ok(pkt)
}
}

Expand Down
10 changes: 3 additions & 7 deletions interceptor/src/nack/generator/generator_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,12 @@ impl GeneratorStream {
#[async_trait]
impl RTPReader for GeneratorStream {
/// read a rtp packet
async fn read(
&self,
buf: &mut [u8],
a: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)> {
let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?;
async fn read(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet> {
let pkt = self.parent_rtp_reader.read(buf).await?;

self.add(pkt.header.sequence_number);

Ok((pkt, attr))
Ok(pkt)
}
}

Expand Down
7 changes: 2 additions & 5 deletions interceptor/src/nack/generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use waitgroup::WaitGroup;
use crate::error::{Error, Result};
use crate::nack::stream_support_nack;
use crate::stream_info::StreamInfo;
use crate::{
Attributes, Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter,
};
use crate::{Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter};

/// GeneratorBuilder can be used to configure Generator Interceptor
#[derive(Default)]
Expand Down Expand Up @@ -136,9 +134,8 @@ impl Generator {
nacks
};

let a = Attributes::new();
for nack in nacks{
if let Err(err) = rtcp_writer.write(&[Box::new(nack)], &a).await{
if let Err(err) = rtcp_writer.write(&[Box::new(nack)]).await{
log::warn!("failed sending nack: {err}");
}
}
Expand Down
14 changes: 5 additions & 9 deletions interceptor/src/nack/responder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use tokio::sync::Mutex;
use crate::error::Result;
use crate::nack::stream_support_nack;
use crate::stream_info::StreamInfo;
use crate::{
Attributes, Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter,
};
use crate::{Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter};

/// GeneratorBuilder can be used to configure Responder Interceptor
#[derive(Default)]
Expand Down Expand Up @@ -73,8 +71,7 @@ impl ResponderInternal {
let stream3 = Arc::clone(&stream2);
Box::pin(async move {
if let Some(p) = stream3.get(seq).await {
let a = Attributes::new();
if let Err(err) = stream3.next_rtp_writer.write(&p, &a).await {
if let Err(err) = stream3.next_rtp_writer.write(&p).await {
log::warn!("failed resending nacked packet: {err}");
}
}
Expand All @@ -101,9 +98,8 @@ impl RTCPReader for ResponderRtcpReader {
async fn read(
&self,
buf: &mut [u8],
a: &Attributes,
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
let (pkts, attr) = { self.parent_rtcp_reader.read(buf, a).await? };
) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
let pkts = { self.parent_rtcp_reader.read(buf).await? };
for p in &pkts {
if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
let nack = nack.clone();
Expand All @@ -114,7 +110,7 @@ impl RTCPReader for ResponderRtcpReader {
}
}

Ok((pkts, attr))
Ok(pkts)
}
}

Expand Down
6 changes: 3 additions & 3 deletions interceptor/src/nack/responder/responder_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::Mutex;

use crate::error::Result;
use crate::nack::UINT16SIZE_HALF;
use crate::{Attributes, RTPWriter};
use crate::RTPWriter;

struct ResponderStreamInternal {
packets: Vec<Option<rtp::packet::Packet>>,
Expand Down Expand Up @@ -90,10 +90,10 @@ impl ResponderStream {
#[async_trait]
impl RTPWriter for ResponderStream {
/// write a rtp packet
async fn write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result<usize> {
async fn write(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
self.add(pkt).await;

self.next_rtp_writer.write(pkt, a).await
self.next_rtp_writer.write(pkt).await
}
}

Expand Down
Loading
Loading