Skip to content

Correct msid handling for RtpSender #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ pub enum Error {
#[error("RtpSender not created by this PeerConnection")]
ErrSenderNotCreatedByConnection,

/// ErrSenderInitialTrackIdAlreadySet indicates a second call to
/// [`RtpSender::set_initial_track_id`] which is not allowed.
#[error("RtpSender's initial_track_id has already been set")]
ErrSenderInitialTrackIdAlreadySet,

/// ErrSessionDescriptionNoFingerprint indicates set_remote_description was called with a SessionDescription that has no
/// fingerprint
#[error("set_remote_description called with no fingerprint")]
Expand Down
81 changes: 57 additions & 24 deletions src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,27 +447,48 @@ impl RTCPeerConnection {
// if t.stopping && !t.stopped {
// return true
// }
let m = get_by_mid(t.mid().await.as_str(), local_desc);
let mid = t.mid().await;
let m = get_by_mid(&mid, local_desc);
// Step 5.2
if !t.stopped.load(Ordering::SeqCst) && m.is_none() {
return true;
}
if !t.stopped.load(Ordering::SeqCst) {
if let Some(m) = m {
// Step 5.3.1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://www.w3.org/TR/webrtc/#ref-for-dfn-check-if-negotiation-is-needed-1 for the steps outlined by the specification

if t.direction() == RTCRtpTransceiverDirection::Sendrecv
|| t.direction() == RTCRtpTransceiverDirection::Sendonly
{
if let (Some(desc_msid), Some(sender)) =
(m.attribute(ATTR_KEY_MSID).and_then(|o| o), t.sender().await)
{
if let Some(track) = &sender.track().await {
if desc_msid != track.stream_id().to_owned() + " " + track.id()
{
return true;
}
if t.direction().has_send() {
let dmsid = match m.attribute(ATTR_KEY_MSID).and_then(|o| o) {
Some(m) => m,
None => return true, // doesn't contain a single a=msid line
};

let sender = match t.sender().await {
Some(s) => s.clone(),
None => {
log::warn!(
"RtpSender missing for transeceiver with sending direction {} for mid {}",
t.direction(),
mid
);
continue;
}
} else {
};
// (...)or the number of MSIDs from the a=msid lines in this m= section,
// or the MSID values themselves, differ from what is in
// transceiver.sender.[[AssociatedMediaStreamIds]], return true.

// TODO: This check should be robuster by storing all streams in the
// local description so we can compare all of them. For no we only
// consider the first one.

let stream_ids = sender.associated_media_stream_ids();
// Different number of lines, 1 vs 0
if stream_ids.is_empty() {
return true;
}

// different stream id
if dmsid.split_whitespace().next() != Some(&stream_ids[0]) {
return true;
}
}
Expand Down Expand Up @@ -1416,7 +1437,7 @@ impl RTCPeerConnection {

if we_offer {
// WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
// Section 4.4.1.5
// 4.5.9.2
// This is an answer from the remote.
if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
for media in &parsed.media_descriptions {
Expand Down Expand Up @@ -1445,17 +1466,19 @@ impl RTCPeerConnection {
if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
let previous_direction = t.direction();

// 4.9.2.9
// 4.5.9.2.9
// Let direction be an RTCRtpTransceiverDirection value representing the direction
// from the media description, but with the send and receive directions reversed to
// represent this peer's point of view. If the media description is rejected,
// set direction to "inactive".
let reversed_direction = direction.reverse();

// 4.9.2.13.2
// 4.5.9.2.13.2
// Set transceiver.[[CurrentDirection]] and transceiver.[[Direction]]s to direction.
t.set_direction_internal(reversed_direction);
t.set_current_direction(reversed_direction);
// TODO: According to the specification we should set
// transceiver.[[Direction]] here, however libWebrtc doesn't do this.
// t.set_direction_internal(reversed_direction);
t.process_new_current_direction(previous_direction).await?;
}
}
Expand Down Expand Up @@ -1745,14 +1768,24 @@ impl RTCPeerConnection {
}
}

if let Some(t) = transceiver {
if sender.stop().await.is_ok() && t.set_sending_track(None).await.is_ok() {
self.internal.trigger_negotiation_needed().await;
}
Ok(())
} else {
Err(Error::ErrSenderNotCreatedByConnection)
let t = transceiver.ok_or(Error::ErrSenderNotCreatedByConnection)?;

// This also happens in `set_sending_track` but we need to make sure we do this
// before we call sender.stop to avoid a race condition when removing tracks and
// generating offers.
t.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
false,
t.direction().has_recv(),
));
// Stop the sender
let sender_result = sender.stop().await;
// This also updates direction
let sending_track_result = t.set_sending_track(None).await;

if sender_result.is_ok() && sending_track_result.is_ok() {
self.internal.trigger_negotiation_needed().await;
}
Ok(())
}

/// add_transceiver_from_kind Create a new RtpTransceiver and adds it to the set of transceivers.
Expand Down
40 changes: 36 additions & 4 deletions src/peer_connection/sdp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,42 @@ pub(crate) async fn add_transceiver_sdp(
track.stream_id().to_owned(), /* streamLabel */
track.id().to_owned(),
);
if !is_plan_b {
media = media.with_property_attribute(
"msid:".to_owned() + track.stream_id() + " " + track.id(),
);

// Send msid based on the configured track if we haven't already
// sent on this sender. If we have sent we must keep the msid line consistent, this
// is handled below.
if !is_plan_b && sender.initial_track_id().is_none() {
for stream_id in sender.associated_media_stream_ids() {
media = media.with_property_attribute(format!(
"msid:{} {}",
stream_id,
track.id()
));
}

sender.set_initial_track_id(track.id().to_string())?;
break;
}
}

if !is_plan_b {
if let Some(track_id) = sender.initial_track_id() {
// After we have include an msid attribute in an offer it must stay the same for
// all subsequent offer even if the track or transceiver direction changes.
//
// [RFC 8829 Section 5.2.2](https://datatracker.ietf.org/doc/html/rfc8829#section-5.2.2)
//
// For RtpTransceivers that are not stopped, the "a=msid" line or
// lines MUST stay the same if they are present in the current
// description, regardless of changes to the transceiver's direction
// or track. If no "a=msid" line is present in the current
// description, "a=msid" line(s) MUST be generated according to the
// same rules as for an initial offer.
for stream_id in sender.associated_media_stream_ids() {
media = media
.with_property_attribute(format!("msid:{} {}", stream_id, track_id));
}

break;
}
}
Expand Down
54 changes: 24 additions & 30 deletions src/rtp_transceiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,14 @@ impl RTCRtpTransceiver {
.current_direction
.swap(d as u8, Ordering::SeqCst)
.into();
trace!(
"Changing current direction of transceiver from {} to {}",
previous,
d,
);

if d != previous {
trace!(
"Changing current direction of transceiver from {} to {}",
previous,
d,
);
}
}

/// Perform any subsequent actions after altering the transceiver's direction.
Expand All @@ -390,11 +393,15 @@ impl RTCRtpTransceiver {
}

let current_direction = self.current_direction();
trace!(
"Processing transceiver direction change from {} to {}",
previous_direction,
current_direction
);
if previous_direction != current_direction {
let mid = self.mid().await;
trace!(
"Processing transceiver({}) direction change from {} to {}",
mid,
previous_direction,
current_direction
);
}

match (previous_direction, current_direction) {
(a, b) if a == b => {
Expand Down Expand Up @@ -462,26 +469,13 @@ impl RTCRtpTransceiver {
}

let direction = self.direction();
if !track_is_none && direction == RTCRtpTransceiverDirection::Recvonly {
self.set_direction_internal(RTCRtpTransceiverDirection::Sendrecv);
} else if !track_is_none && direction == RTCRtpTransceiverDirection::Inactive {
self.set_direction_internal(RTCRtpTransceiverDirection::Sendonly);
} else if track_is_none && direction == RTCRtpTransceiverDirection::Sendrecv {
self.set_direction_internal(RTCRtpTransceiverDirection::Recvonly);
} else if !track_is_none
&& (direction == RTCRtpTransceiverDirection::Sendonly
|| direction == RTCRtpTransceiverDirection::Sendrecv)
{
// Handle the case where a sendonly transceiver was added by a negotiation
// initiated by remote peer. For example a remote peer added a transceiver
// with direction recvonly.
//} else if !track_is_none && self.direction == RTPTransceiverDirection::Sendrecv {
// Similar to above, but for sendrecv transceiver.
} else if track_is_none && direction == RTCRtpTransceiverDirection::Sendonly {
self.set_direction_internal(RTCRtpTransceiverDirection::Inactive);
} else {
return Err(Error::ErrRTPTransceiverSetSendingInvalidState);
}
let should_send = !track_is_none;
let should_recv = direction.has_recv();
self.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
should_send,
should_recv,
));

Ok(())
}
}
Expand Down
45 changes: 45 additions & 0 deletions src/rtp_transceiver/rtp_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ pub struct RTCRtpSender {

pub(crate) id: String,

/// The id of the initial track, even if we later change to a different
/// track id should be use when negotiating.
pub(crate) initial_track_id: std::sync::Mutex<Option<String>>,
/// AssociatedMediaStreamIds from the WebRTC specifcations
pub(crate) associated_media_stream_ids: std::sync::Mutex<Vec<String>>,

rtp_transceiver: Mutex<Option<Weak<RTCRtpTransceiver>>>,

send_called_tx: Mutex<Option<mpsc::Sender<()>>>,
Expand Down Expand Up @@ -157,6 +163,7 @@ impl RTCRtpSender {
*internal_rtcp_interceptor = Some(rtcp_interceptor);
}

let stream_ids = vec![track.stream_id().to_string()];
RTCRtpSender {
track: Mutex::new(Some(track)),

Expand All @@ -176,6 +183,8 @@ impl RTCRtpSender {
interceptor,

id,
initial_track_id: std::sync::Mutex::new(None),
associated_media_stream_ids: std::sync::Mutex::new(stream_ids),

rtp_transceiver: Mutex::new(None),

Expand Down Expand Up @@ -463,4 +472,40 @@ impl RTCRtpSender {
pub(crate) async fn has_stopped(&self) -> bool {
self.stop_called_signal.load(Ordering::SeqCst)
}

pub(crate) fn initial_track_id(&self) -> Option<String> {
let lock = self.initial_track_id.lock().unwrap();

lock.clone()
}

pub(crate) fn set_initial_track_id(&self, id: String) -> Result<()> {
let mut lock = self.initial_track_id.lock().unwrap();

if lock.is_some() {
return Err(Error::ErrSenderInitialTrackIdAlreadySet);
}

*lock = Some(id);

Ok(())
}

pub(crate) fn associate_media_stream_id(&self, id: String) -> bool {
let mut lock = self.associated_media_stream_ids.lock().unwrap();

if lock.contains(&id) {
return false;
}

lock.push(id);

true
}

pub(crate) fn associated_media_stream_ids(&self) -> Vec<String> {
let lock = self.associated_media_stream_ids.lock().unwrap();

lock.clone()
}
}
4 changes: 2 additions & 2 deletions src/rtp_transceiver/rtp_transceiver_direction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ impl RTCRtpTransceiverDirection {
}
}

fn has_send(&self) -> bool {
pub fn has_send(&self) -> bool {
matches!(self, Self::Sendrecv | Self::Sendonly)
}

fn has_recv(&self) -> bool {
pub fn has_recv(&self) -> bool {
matches!(self, Self::Sendrecv | Self::Recvonly)
}
}
Expand Down