Skip to content

Commit 4895d64

Browse files
Correct msid handling for RtpSender (#217)
* Correct msid handling for RtpSender The previous logic would put negotiation into an endless loop after running `remove_track` due to incorrectly implementing the negotiation check and handling of stream ids. * Make sure we actually set the sending track to None The way this code was written the `set_sending_track` call could sometimes not run if `sender.stop()` failed. With this change we make sure to attempt both operations always, even if one fails. * Don't follow the specification Match libWebrtc's behaviour which doesn't update `transcevier.[[Direction]]` instead of following the specification. See: https://webrtc.googlesource.com/src/+/c501f30333ce8b46a92b75a6bf75733ddb0e9741/pc/sdp_offer_answer.cc#2018
1 parent 103d8e6 commit 4895d64

File tree

6 files changed

+169
-60
lines changed

6 files changed

+169
-60
lines changed

src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ pub enum Error {
132132
#[error("RtpSender not created by this PeerConnection")]
133133
ErrSenderNotCreatedByConnection,
134134

135+
/// ErrSenderInitialTrackIdAlreadySet indicates a second call to
136+
/// [`RtpSender::set_initial_track_id`] which is not allowed.
137+
#[error("RtpSender's initial_track_id has already been set")]
138+
ErrSenderInitialTrackIdAlreadySet,
139+
135140
/// ErrSessionDescriptionNoFingerprint indicates set_remote_description was called with a SessionDescription that has no
136141
/// fingerprint
137142
#[error("set_remote_description called with no fingerprint")]

src/peer_connection/mod.rs

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -447,27 +447,48 @@ impl RTCPeerConnection {
447447
// if t.stopping && !t.stopped {
448448
// return true
449449
// }
450-
let m = get_by_mid(t.mid().await.as_str(), local_desc);
450+
let mid = t.mid().await;
451+
let m = get_by_mid(&mid, local_desc);
451452
// Step 5.2
452453
if !t.stopped.load(Ordering::SeqCst) && m.is_none() {
453454
return true;
454455
}
455456
if !t.stopped.load(Ordering::SeqCst) {
456457
if let Some(m) = m {
457458
// Step 5.3.1
458-
if t.direction() == RTCRtpTransceiverDirection::Sendrecv
459-
|| t.direction() == RTCRtpTransceiverDirection::Sendonly
460-
{
461-
if let (Some(desc_msid), Some(sender)) =
462-
(m.attribute(ATTR_KEY_MSID).and_then(|o| o), t.sender().await)
463-
{
464-
if let Some(track) = &sender.track().await {
465-
if desc_msid != track.stream_id().to_owned() + " " + track.id()
466-
{
467-
return true;
468-
}
459+
if t.direction().has_send() {
460+
let dmsid = match m.attribute(ATTR_KEY_MSID).and_then(|o| o) {
461+
Some(m) => m,
462+
None => return true, // doesn't contain a single a=msid line
463+
};
464+
465+
let sender = match t.sender().await {
466+
Some(s) => s.clone(),
467+
None => {
468+
log::warn!(
469+
"RtpSender missing for transeceiver with sending direction {} for mid {}",
470+
t.direction(),
471+
mid
472+
);
473+
continue;
469474
}
470-
} else {
475+
};
476+
// (...)or the number of MSIDs from the a=msid lines in this m= section,
477+
// or the MSID values themselves, differ from what is in
478+
// transceiver.sender.[[AssociatedMediaStreamIds]], return true.
479+
480+
// TODO: This check should be robuster by storing all streams in the
481+
// local description so we can compare all of them. For no we only
482+
// consider the first one.
483+
484+
let stream_ids = sender.associated_media_stream_ids();
485+
// Different number of lines, 1 vs 0
486+
if stream_ids.is_empty() {
487+
return true;
488+
}
489+
490+
// different stream id
491+
if dmsid.split_whitespace().next() != Some(&stream_ids[0]) {
471492
return true;
472493
}
473494
}
@@ -1416,7 +1437,7 @@ impl RTCPeerConnection {
14161437

14171438
if we_offer {
14181439
// WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
1419-
// Section 4.4.1.5
1440+
// 4.5.9.2
14201441
// This is an answer from the remote.
14211442
if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
14221443
for media in &parsed.media_descriptions {
@@ -1445,17 +1466,19 @@ impl RTCPeerConnection {
14451466
if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
14461467
let previous_direction = t.direction();
14471468

1448-
// 4.9.2.9
1469+
// 4.5.9.2.9
14491470
// Let direction be an RTCRtpTransceiverDirection value representing the direction
14501471
// from the media description, but with the send and receive directions reversed to
14511472
// represent this peer's point of view. If the media description is rejected,
14521473
// set direction to "inactive".
14531474
let reversed_direction = direction.reverse();
14541475

1455-
// 4.9.2.13.2
1476+
// 4.5.9.2.13.2
14561477
// Set transceiver.[[CurrentDirection]] and transceiver.[[Direction]]s to direction.
1457-
t.set_direction_internal(reversed_direction);
14581478
t.set_current_direction(reversed_direction);
1479+
// TODO: According to the specification we should set
1480+
// transceiver.[[Direction]] here, however libWebrtc doesn't do this.
1481+
// t.set_direction_internal(reversed_direction);
14591482
t.process_new_current_direction(previous_direction).await?;
14601483
}
14611484
}
@@ -1745,14 +1768,24 @@ impl RTCPeerConnection {
17451768
}
17461769
}
17471770

1748-
if let Some(t) = transceiver {
1749-
if sender.stop().await.is_ok() && t.set_sending_track(None).await.is_ok() {
1750-
self.internal.trigger_negotiation_needed().await;
1751-
}
1752-
Ok(())
1753-
} else {
1754-
Err(Error::ErrSenderNotCreatedByConnection)
1771+
let t = transceiver.ok_or(Error::ErrSenderNotCreatedByConnection)?;
1772+
1773+
// This also happens in `set_sending_track` but we need to make sure we do this
1774+
// before we call sender.stop to avoid a race condition when removing tracks and
1775+
// generating offers.
1776+
t.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
1777+
false,
1778+
t.direction().has_recv(),
1779+
));
1780+
// Stop the sender
1781+
let sender_result = sender.stop().await;
1782+
// This also updates direction
1783+
let sending_track_result = t.set_sending_track(None).await;
1784+
1785+
if sender_result.is_ok() && sending_track_result.is_ok() {
1786+
self.internal.trigger_negotiation_needed().await;
17551787
}
1788+
Ok(())
17561789
}
17571790

17581791
/// add_transceiver_from_kind Create a new RtpTransceiver and adds it to the set of transceivers.

src/peer_connection/sdp/mod.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,42 @@ pub(crate) async fn add_transceiver_sdp(
550550
track.stream_id().to_owned(), /* streamLabel */
551551
track.id().to_owned(),
552552
);
553-
if !is_plan_b {
554-
media = media.with_property_attribute(
555-
"msid:".to_owned() + track.stream_id() + " " + track.id(),
556-
);
553+
554+
// Send msid based on the configured track if we haven't already
555+
// sent on this sender. If we have sent we must keep the msid line consistent, this
556+
// is handled below.
557+
if !is_plan_b && sender.initial_track_id().is_none() {
558+
for stream_id in sender.associated_media_stream_ids() {
559+
media = media.with_property_attribute(format!(
560+
"msid:{} {}",
561+
stream_id,
562+
track.id()
563+
));
564+
}
565+
566+
sender.set_initial_track_id(track.id().to_string())?;
567+
break;
568+
}
569+
}
570+
571+
if !is_plan_b {
572+
if let Some(track_id) = sender.initial_track_id() {
573+
// After we have include an msid attribute in an offer it must stay the same for
574+
// all subsequent offer even if the track or transceiver direction changes.
575+
//
576+
// [RFC 8829 Section 5.2.2](https://datatracker.ietf.org/doc/html/rfc8829#section-5.2.2)
577+
//
578+
// For RtpTransceivers that are not stopped, the "a=msid" line or
579+
// lines MUST stay the same if they are present in the current
580+
// description, regardless of changes to the transceiver's direction
581+
// or track. If no "a=msid" line is present in the current
582+
// description, "a=msid" line(s) MUST be generated according to the
583+
// same rules as for an initial offer.
584+
for stream_id in sender.associated_media_stream_ids() {
585+
media = media
586+
.with_property_attribute(format!("msid:{} {}", stream_id, track_id));
587+
}
588+
557589
break;
558590
}
559591
}

src/rtp_transceiver/mod.rs

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,14 @@ impl RTCRtpTransceiver {
370370
.current_direction
371371
.swap(d as u8, Ordering::SeqCst)
372372
.into();
373-
trace!(
374-
"Changing current direction of transceiver from {} to {}",
375-
previous,
376-
d,
377-
);
373+
374+
if d != previous {
375+
trace!(
376+
"Changing current direction of transceiver from {} to {}",
377+
previous,
378+
d,
379+
);
380+
}
378381
}
379382

380383
/// Perform any subsequent actions after altering the transceiver's direction.
@@ -390,11 +393,15 @@ impl RTCRtpTransceiver {
390393
}
391394

392395
let current_direction = self.current_direction();
393-
trace!(
394-
"Processing transceiver direction change from {} to {}",
395-
previous_direction,
396-
current_direction
397-
);
396+
if previous_direction != current_direction {
397+
let mid = self.mid().await;
398+
trace!(
399+
"Processing transceiver({}) direction change from {} to {}",
400+
mid,
401+
previous_direction,
402+
current_direction
403+
);
404+
}
398405

399406
match (previous_direction, current_direction) {
400407
(a, b) if a == b => {
@@ -462,26 +469,13 @@ impl RTCRtpTransceiver {
462469
}
463470

464471
let direction = self.direction();
465-
if !track_is_none && direction == RTCRtpTransceiverDirection::Recvonly {
466-
self.set_direction_internal(RTCRtpTransceiverDirection::Sendrecv);
467-
} else if !track_is_none && direction == RTCRtpTransceiverDirection::Inactive {
468-
self.set_direction_internal(RTCRtpTransceiverDirection::Sendonly);
469-
} else if track_is_none && direction == RTCRtpTransceiverDirection::Sendrecv {
470-
self.set_direction_internal(RTCRtpTransceiverDirection::Recvonly);
471-
} else if !track_is_none
472-
&& (direction == RTCRtpTransceiverDirection::Sendonly
473-
|| direction == RTCRtpTransceiverDirection::Sendrecv)
474-
{
475-
// Handle the case where a sendonly transceiver was added by a negotiation
476-
// initiated by remote peer. For example a remote peer added a transceiver
477-
// with direction recvonly.
478-
//} else if !track_is_none && self.direction == RTPTransceiverDirection::Sendrecv {
479-
// Similar to above, but for sendrecv transceiver.
480-
} else if track_is_none && direction == RTCRtpTransceiverDirection::Sendonly {
481-
self.set_direction_internal(RTCRtpTransceiverDirection::Inactive);
482-
} else {
483-
return Err(Error::ErrRTPTransceiverSetSendingInvalidState);
484-
}
472+
let should_send = !track_is_none;
473+
let should_recv = direction.has_recv();
474+
self.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
475+
should_send,
476+
should_recv,
477+
));
478+
485479
Ok(())
486480
}
487481
}

src/rtp_transceiver/rtp_sender/mod.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ pub struct RTCRtpSender {
9999

100100
pub(crate) id: String,
101101

102+
/// The id of the initial track, even if we later change to a different
103+
/// track id should be use when negotiating.
104+
pub(crate) initial_track_id: std::sync::Mutex<Option<String>>,
105+
/// AssociatedMediaStreamIds from the WebRTC specifcations
106+
pub(crate) associated_media_stream_ids: std::sync::Mutex<Vec<String>>,
107+
102108
rtp_transceiver: Mutex<Option<Weak<RTCRtpTransceiver>>>,
103109

104110
send_called_tx: Mutex<Option<mpsc::Sender<()>>>,
@@ -157,6 +163,7 @@ impl RTCRtpSender {
157163
*internal_rtcp_interceptor = Some(rtcp_interceptor);
158164
}
159165

166+
let stream_ids = vec![track.stream_id().to_string()];
160167
RTCRtpSender {
161168
track: Mutex::new(Some(track)),
162169

@@ -176,6 +183,8 @@ impl RTCRtpSender {
176183
interceptor,
177184

178185
id,
186+
initial_track_id: std::sync::Mutex::new(None),
187+
associated_media_stream_ids: std::sync::Mutex::new(stream_ids),
179188

180189
rtp_transceiver: Mutex::new(None),
181190

@@ -463,4 +472,40 @@ impl RTCRtpSender {
463472
pub(crate) async fn has_stopped(&self) -> bool {
464473
self.stop_called_signal.load(Ordering::SeqCst)
465474
}
475+
476+
pub(crate) fn initial_track_id(&self) -> Option<String> {
477+
let lock = self.initial_track_id.lock().unwrap();
478+
479+
lock.clone()
480+
}
481+
482+
pub(crate) fn set_initial_track_id(&self, id: String) -> Result<()> {
483+
let mut lock = self.initial_track_id.lock().unwrap();
484+
485+
if lock.is_some() {
486+
return Err(Error::ErrSenderInitialTrackIdAlreadySet);
487+
}
488+
489+
*lock = Some(id);
490+
491+
Ok(())
492+
}
493+
494+
pub(crate) fn associate_media_stream_id(&self, id: String) -> bool {
495+
let mut lock = self.associated_media_stream_ids.lock().unwrap();
496+
497+
if lock.contains(&id) {
498+
return false;
499+
}
500+
501+
lock.push(id);
502+
503+
true
504+
}
505+
506+
pub(crate) fn associated_media_stream_ids(&self) -> Vec<String> {
507+
let lock = self.associated_media_stream_ids.lock().unwrap();
508+
509+
lock.clone()
510+
}
466511
}

src/rtp_transceiver/rtp_transceiver_direction.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ impl RTCRtpTransceiverDirection {
9797
}
9898
}
9999

100-
fn has_send(&self) -> bool {
100+
pub fn has_send(&self) -> bool {
101101
matches!(self, Self::Sendrecv | Self::Sendonly)
102102
}
103103

104-
fn has_recv(&self) -> bool {
104+
pub fn has_recv(&self) -> bool {
105105
matches!(self, Self::Sendrecv | Self::Recvonly)
106106
}
107107
}

0 commit comments

Comments
 (0)