Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl PeerConnectionInternal {
}
}

self.start_rtp_receivers(&mut track_details, &current_transceivers)
self.start_rtp_receivers(&mut track_details, &current_transceivers, is_renegotiation)
.await?;
if let Some(parsed_remote) = &remote_desc.parsed {
let current_local_desc = self.current_local_description.lock().await;
Expand Down Expand Up @@ -418,17 +418,23 @@ impl PeerConnectionInternal {
self: &Arc<Self>,
incoming_tracks: &mut Vec<TrackDetails>,
local_transceivers: &[Arc<RTCRtpTransceiver>],
is_renegotiation: bool,
) -> Result<()> {
// Ensure we haven't already started a transceiver for this ssrc
// Ensure we haven't already started a transceiver for this ssrc.
// Skip filtering during renegotiation since receiver reuse logic handles it.
let mut filtered_tracks = incoming_tracks.clone();
for incoming_track in incoming_tracks {
// If we already have a TrackRemote for a given SSRC don't handle it again
for t in local_transceivers {
let receiver = t.receiver().await;
for track in receiver.tracks().await {
for ssrc in &incoming_track.ssrcs {
if *ssrc == track.ssrc() {
filter_track_with_ssrc(&mut filtered_tracks, track.ssrc());

if !is_renegotiation {
for incoming_track in incoming_tracks {
// If we already have a TrackRemote for a given SSRC don't handle it again
for t in local_transceivers {
let receiver = t.receiver().await;
let existing_tracks = receiver.tracks().await;
for track in existing_tracks {
for ssrc in &incoming_track.ssrcs {
if *ssrc == track.ssrc() {
filter_track_with_ssrc(&mut filtered_tracks, track.ssrc());
}
}
}
}
Expand All @@ -450,10 +456,31 @@ impl PeerConnectionInternal {
continue;
}

// Fix(issue-749): Handle receiver reuse during renegotiation in mesh topology.
//
// During SDP renegotiation, the same tracks (SSRCs) legitimately appear in
// subsequent negotiation rounds per RFC 8829 Section 3.7. Receivers that are
// already active should be recognized as handling their existing tracks rather
// than being skipped and marked as "NOT HANDLED".
//
// Root cause: The original code didn't distinguish between initial negotiation
// (where skipping active receivers prevents duplicates) and renegotiation
// (where active receivers represent existing media flows to preserve).
let receiver = t.receiver().await;
if receiver.have_received().await {
continue;
let already_receiving = receiver.have_received().await;

if already_receiving {
if !is_renegotiation {
// Initial negotiation: skip if already receiving (safety check)
continue;
} else {
// Renegotiation: receiver already active, mark as handled
track_handled = true;
break;
}
}

// Start receiver for new tracks only
PeerConnectionInternal::start_receiver(
self.setting_engine.get_receive_mtu(),
incoming_track,
Expand Down
271 changes: 271 additions & 0 deletions webrtc/src/peer_connection/peer_connection_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,3 +740,274 @@ async fn test_peer_connection_state() -> Result<()> {

Ok(())
}

// test_kind: bug_reproducer(issue-749)
/// # Bug Reproducer: Receiver Reuse During Renegotiation in Mesh Topology
///
/// ## Root Cause
///
/// The `start_rtp_receivers()` function in `peer_connection_internal.rs` skipped
/// transceivers that were already receiving (`have_received()=true`) during SDP
/// renegotiation, causing tracks to be marked as "NOT HANDLED" despite receivers
/// being active and media flowing correctly.
///
/// The bug occurred because the code didn't distinguish between two contexts:
/// - **Initial negotiation**: Skipping active receivers prevents duplicate starts (CORRECT)
/// - **Renegotiation**: Skipping active receivers breaks RFC 8829 compliance (BUG)
///
/// During renegotiation in mesh topologies, the same tracks (SSRCs) legitimately
/// reappear in the SDP per RFC 8829 Section 3.7's requirement to "reuse existing
/// media descriptions". The code incorrectly treated these as duplicates to skip
/// rather than existing flows to preserve.
///
/// ## Why Not Caught
///
/// No automated tests covered SDP renegotiation scenarios in mesh topologies.
/// Existing tests only validated the initial negotiation path with simple 1-to-1
/// peer connections.
///
/// The bug only manifests when:
/// 1. Multiple negotiation rounds occur (initial + renegotiation)
/// 2. The same SSRCs appear in subsequent SDPs (expected per RFC 8829)
/// 3. Receivers are already active when renegotiation begins
///
/// Single-round negotiation tests passed because `have_received()` returns `false`
/// during initial setup, so the skip logic was never triggered.
///
/// ## Fix Applied
///
/// Added `is_renegotiation: bool` parameter to `start_rtp_receivers()` to enable
/// context-aware handling:
///
/// ```rust
/// if already_receiving {
/// if !is_renegotiation {
/// continue; // Initial: skip to prevent duplicates (safety)
/// } else {
/// track_handled = true; // Renegotiation: mark as handled (RFC 8829)
/// break;
/// }
/// }
/// ```
///
/// Additionally, SSRC filtering is skipped during renegotiation since existing
/// SSRCs are expected to reappear per the specification.
///
/// ## Prevention
///
/// All WebRTC renegotiation code must:
/// 1. Distinguish between initial negotiation and renegotiation contexts
/// 2. Test multi-round negotiation scenarios, not just initial setup
/// 3. Verify compliance with RFC 8829 Section 3.7 (reuse of media descriptions)
/// 4. Consider mesh topology scenarios where renegotiation is common
///
/// Any code checking receiver state (`have_received()`, transceiver status, etc.)
/// during SDP processing should consider whether the behavior differs between
/// initial and subsequent negotiations.
///
/// ## Pitfall
///
/// **Never assume `have_received()=true` always means "skip this receiver".**
///
/// Context matters critically:
/// - Initial negotiation: `have_received()=true` indicates a safety issue (duplicate)
/// - Renegotiation: `have_received()=true` indicates RFC 8829 compliance (reuse)
///
/// The same receiver state has opposite meanings in different contexts. Always
/// check whether the current operation is initial negotiation or renegotiation
/// before making flow control decisions based on receiver state.
///
/// Failing to consider context leads to either:
/// - False positives (marking valid reused tracks as "NOT HANDLED")
/// - False negatives (allowing duplicate receivers during initial setup)
#[tokio::test]
async fn test_receiver_reuse_during_renegotiation_issue_749() -> Result<()> {
// Setup: Create peer connection pair with media engine
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let api = APIBuilder::new().with_media_engine(m).build();

let (mut pc_offer, mut pc_answer) = new_pair(&api).await?;

// Track receiver counts
let initial_track_count = Arc::new(AtomicU32::new(0));
let renegotiation_track_count = Arc::new(AtomicU32::new(0));

let initial_count_clone = Arc::clone(&initial_track_count);
let renegotiation_count_clone = Arc::clone(&renegotiation_track_count);
let negotiation_phase = Arc::new(AtomicU32::new(0)); // 0=initial, 1=renegotiation
let phase_clone = Arc::clone(&negotiation_phase);

pc_answer.on_track(Box::new(move |_track, _receiver, _transceiver| {
let phase = phase_clone.load(Ordering::SeqCst);
if phase == 0 {
initial_count_clone.fetch_add(1, Ordering::SeqCst);
} else {
renegotiation_count_clone.fetch_add(1, Ordering::SeqCst);
}
Box::pin(async move {})
}));

// Step 1: Add initial tracks (video + audio) to offerer
let video_track = Arc::new(TrackLocalStaticSample::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
"video_initial".to_owned(),
"stream_initial".to_owned(),
));

let audio_track = Arc::new(TrackLocalStaticSample::new(
RTCRtpCodecCapability {
mime_type: "audio/opus".to_owned(),
..Default::default()
},
"audio_initial".to_owned(),
"stream_initial".to_owned(),
));

pc_offer.add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>).await?;
pc_offer.add_track(Arc::clone(&audio_track) as Arc<dyn TrackLocal + Send + Sync>).await?;

// Step 2: Perform initial negotiation
signal_pair(&mut pc_offer, &mut pc_answer).await?;

// Wait for ICE connection
tokio::time::sleep(Duration::from_millis(500)).await;

// Step 3: Verify initial tracks were handled
let initial_transceivers = pc_answer.get_transceivers().await;
assert_eq!(
initial_transceivers.len(),
2,
"Should have 2 transceivers after initial negotiation"
);

// Verify receivers are active
for (idx, t) in initial_transceivers.iter().enumerate() {
let receiver = t.receiver().await;
assert!(
receiver.have_received().await,
"Receiver {} should be active after initial negotiation",
idx
);
}

// Capture initial SSRCs for verification
let mut initial_ssrcs = Vec::new();
for t in &initial_transceivers {
let receiver = t.receiver().await;
let tracks = receiver.tracks().await;
if !tracks.is_empty() {
initial_ssrcs.push(tracks[0].ssrc());
}
}
assert_eq!(
initial_ssrcs.len(),
2,
"Should have captured 2 initial SSRCs"
);

// Step 4: Add new track to trigger renegotiation
negotiation_phase.store(1, Ordering::SeqCst); // Mark as renegotiation phase

let new_video_track = Arc::new(TrackLocalStaticSample::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
"video_new".to_owned(),
"stream_new".to_owned(),
));

pc_offer.add_track(new_video_track as Arc<dyn TrackLocal + Send + Sync>).await?;

// Step 5: Perform renegotiation
let reoffer = pc_offer.create_offer(None).await?;

// Verify the SDP includes existing SSRCs (per RFC 8829 Section 3.7)
let offer_sdp = reoffer.sdp.clone();
for ssrc in &initial_ssrcs {
assert!(
offer_sdp.contains(&ssrc.to_string()),
"Renegotiation SDP should include existing SSRC {} (RFC 8829 requirement)",
ssrc
);
}

let mut offer_gathering_complete = pc_offer.gathering_complete_promise().await;
pc_offer.set_local_description(reoffer).await?;
let _ = offer_gathering_complete.recv().await;

pc_answer
.set_remote_description(
pc_offer
.local_description()
.await
.ok_or(Error::new("no local description".to_owned()))?,
)
.await?;

let reanswer = pc_answer.create_answer(None).await?;
let mut answer_gathering_complete = pc_answer.gathering_complete_promise().await;
pc_answer.set_local_description(reanswer).await?;
let _ = answer_gathering_complete.recv().await;

pc_offer
.set_remote_description(
pc_answer
.local_description()
.await
.ok_or(Error::new("no local description".to_owned()))?,
)
.await?;

// Wait for renegotiation to complete
tokio::time::sleep(Duration::from_millis(500)).await;

// Step 6: CRITICAL ASSERTION - Verify existing receivers marked as HANDLED
// This is where the bug manifested: existing receivers were skipped and
// marked as "NOT HANDLED" despite being active.
let transceivers_after = pc_answer.get_transceivers().await;
assert_eq!(
transceivers_after.len(),
3,
"Should have 3 transceivers after renegotiation (2 existing + 1 new)"
);

// Verify existing tracks still active with same SSRCs (receiver reused, not restarted)
for (idx, expected_ssrc) in initial_ssrcs.iter().enumerate() {
let receiver = transceivers_after[idx].receiver().await;
assert!(
receiver.have_received().await,
"Existing receiver {} should still be active after renegotiation",
idx
);

let tracks = receiver.tracks().await;
assert_eq!(
tracks.len(),
1,
"Existing receiver {} should have 1 track",
idx
);
assert_eq!(
tracks[0].ssrc(),
*expected_ssrc,
"SSRC should match original (receiver reused, not duplicated)"
);
}

// Verify new receiver also started
let new_receiver = transceivers_after[2].receiver().await;
assert!(
new_receiver.have_received().await,
"New receiver should be active"
);

// Cleanup
close_pair_now(&pc_offer, &pc_answer).await;

Ok(())
}
Loading