diff --git a/webrtc/src/peer_connection/peer_connection_internal.rs b/webrtc/src/peer_connection/peer_connection_internal.rs index 0b5ceaec0..393d72cb4 100644 --- a/webrtc/src/peer_connection/peer_connection_internal.rs +++ b/webrtc/src/peer_connection/peer_connection_internal.rs @@ -284,7 +284,7 @@ impl PeerConnectionInternal { } } - self.start_rtp_receivers(&mut track_details, ¤t_transceivers) + self.start_rtp_receivers(&mut track_details, ¤t_transceivers, is_renegotiation) .await?; if let Some(parsed_remote) = &remote_desc.parsed { let current_local_desc = self.current_local_description.lock().await; @@ -418,17 +418,23 @@ impl PeerConnectionInternal { self: &Arc, incoming_tracks: &mut Vec, local_transceivers: &[Arc], + is_renegotiation: bool, ) -> Result<()> { - // Ensure we haven't already started a transceiver for this ssrc + // Ensure we haven't already started a transceiver for this ssrc. + // Skip filtering during renegotiation since receiver reuse logic handles it. let mut filtered_tracks = incoming_tracks.clone(); - for incoming_track in incoming_tracks { - // If we already have a TrackRemote for a given SSRC don't handle it again - for t in local_transceivers { - let receiver = t.receiver().await; - for track in receiver.tracks().await { - for ssrc in &incoming_track.ssrcs { - if *ssrc == track.ssrc() { - filter_track_with_ssrc(&mut filtered_tracks, track.ssrc()); + + if !is_renegotiation { + for incoming_track in incoming_tracks { + // If we already have a TrackRemote for a given SSRC don't handle it again + for t in local_transceivers { + let receiver = t.receiver().await; + let existing_tracks = receiver.tracks().await; + for track in existing_tracks { + for ssrc in &incoming_track.ssrcs { + if *ssrc == track.ssrc() { + filter_track_with_ssrc(&mut filtered_tracks, track.ssrc()); + } } } } @@ -450,10 +456,31 @@ impl PeerConnectionInternal { continue; } + // Fix(issue-749): Handle receiver reuse during renegotiation in mesh topology. + // + // During SDP renegotiation, the same tracks (SSRCs) legitimately appear in + // subsequent negotiation rounds per RFC 8829 Section 3.7. Receivers that are + // already active should be recognized as handling their existing tracks rather + // than being skipped and marked as "NOT HANDLED". + // + // Root cause: The original code didn't distinguish between initial negotiation + // (where skipping active receivers prevents duplicates) and renegotiation + // (where active receivers represent existing media flows to preserve). let receiver = t.receiver().await; - if receiver.have_received().await { - continue; + let already_receiving = receiver.have_received().await; + + if already_receiving { + if !is_renegotiation { + // Initial negotiation: skip if already receiving (safety check) + continue; + } else { + // Renegotiation: receiver already active, mark as handled + track_handled = true; + break; + } } + + // Start receiver for new tracks only PeerConnectionInternal::start_receiver( self.setting_engine.get_receive_mtu(), incoming_track, diff --git a/webrtc/src/peer_connection/peer_connection_test.rs b/webrtc/src/peer_connection/peer_connection_test.rs index cb001fe4b..396d04b6a 100644 --- a/webrtc/src/peer_connection/peer_connection_test.rs +++ b/webrtc/src/peer_connection/peer_connection_test.rs @@ -740,3 +740,274 @@ async fn test_peer_connection_state() -> Result<()> { Ok(()) } + +// test_kind: bug_reproducer(issue-749) +/// # Bug Reproducer: Receiver Reuse During Renegotiation in Mesh Topology +/// +/// ## Root Cause +/// +/// The `start_rtp_receivers()` function in `peer_connection_internal.rs` skipped +/// transceivers that were already receiving (`have_received()=true`) during SDP +/// renegotiation, causing tracks to be marked as "NOT HANDLED" despite receivers +/// being active and media flowing correctly. +/// +/// The bug occurred because the code didn't distinguish between two contexts: +/// - **Initial negotiation**: Skipping active receivers prevents duplicate starts (CORRECT) +/// - **Renegotiation**: Skipping active receivers breaks RFC 8829 compliance (BUG) +/// +/// During renegotiation in mesh topologies, the same tracks (SSRCs) legitimately +/// reappear in the SDP per RFC 8829 Section 3.7's requirement to "reuse existing +/// media descriptions". The code incorrectly treated these as duplicates to skip +/// rather than existing flows to preserve. +/// +/// ## Why Not Caught +/// +/// No automated tests covered SDP renegotiation scenarios in mesh topologies. +/// Existing tests only validated the initial negotiation path with simple 1-to-1 +/// peer connections. +/// +/// The bug only manifests when: +/// 1. Multiple negotiation rounds occur (initial + renegotiation) +/// 2. The same SSRCs appear in subsequent SDPs (expected per RFC 8829) +/// 3. Receivers are already active when renegotiation begins +/// +/// Single-round negotiation tests passed because `have_received()` returns `false` +/// during initial setup, so the skip logic was never triggered. +/// +/// ## Fix Applied +/// +/// Added `is_renegotiation: bool` parameter to `start_rtp_receivers()` to enable +/// context-aware handling: +/// +/// ```rust +/// if already_receiving { +/// if !is_renegotiation { +/// continue; // Initial: skip to prevent duplicates (safety) +/// } else { +/// track_handled = true; // Renegotiation: mark as handled (RFC 8829) +/// break; +/// } +/// } +/// ``` +/// +/// Additionally, SSRC filtering is skipped during renegotiation since existing +/// SSRCs are expected to reappear per the specification. +/// +/// ## Prevention +/// +/// All WebRTC renegotiation code must: +/// 1. Distinguish between initial negotiation and renegotiation contexts +/// 2. Test multi-round negotiation scenarios, not just initial setup +/// 3. Verify compliance with RFC 8829 Section 3.7 (reuse of media descriptions) +/// 4. Consider mesh topology scenarios where renegotiation is common +/// +/// Any code checking receiver state (`have_received()`, transceiver status, etc.) +/// during SDP processing should consider whether the behavior differs between +/// initial and subsequent negotiations. +/// +/// ## Pitfall +/// +/// **Never assume `have_received()=true` always means "skip this receiver".** +/// +/// Context matters critically: +/// - Initial negotiation: `have_received()=true` indicates a safety issue (duplicate) +/// - Renegotiation: `have_received()=true` indicates RFC 8829 compliance (reuse) +/// +/// The same receiver state has opposite meanings in different contexts. Always +/// check whether the current operation is initial negotiation or renegotiation +/// before making flow control decisions based on receiver state. +/// +/// Failing to consider context leads to either: +/// - False positives (marking valid reused tracks as "NOT HANDLED") +/// - False negatives (allowing duplicate receivers during initial setup) +#[tokio::test] +async fn test_receiver_reuse_during_renegotiation_issue_749() -> Result<()> { + // Setup: Create peer connection pair with media engine + let mut m = MediaEngine::default(); + m.register_default_codecs()?; + let api = APIBuilder::new().with_media_engine(m).build(); + + let (mut pc_offer, mut pc_answer) = new_pair(&api).await?; + + // Track receiver counts + let initial_track_count = Arc::new(AtomicU32::new(0)); + let renegotiation_track_count = Arc::new(AtomicU32::new(0)); + + let initial_count_clone = Arc::clone(&initial_track_count); + let renegotiation_count_clone = Arc::clone(&renegotiation_track_count); + let negotiation_phase = Arc::new(AtomicU32::new(0)); // 0=initial, 1=renegotiation + let phase_clone = Arc::clone(&negotiation_phase); + + pc_answer.on_track(Box::new(move |_track, _receiver, _transceiver| { + let phase = phase_clone.load(Ordering::SeqCst); + if phase == 0 { + initial_count_clone.fetch_add(1, Ordering::SeqCst); + } else { + renegotiation_count_clone.fetch_add(1, Ordering::SeqCst); + } + Box::pin(async move {}) + })); + + // Step 1: Add initial tracks (video + audio) to offerer + let video_track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video_initial".to_owned(), + "stream_initial".to_owned(), + )); + + let audio_track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: "audio/opus".to_owned(), + ..Default::default() + }, + "audio_initial".to_owned(), + "stream_initial".to_owned(), + )); + + pc_offer.add_track(Arc::clone(&video_track) as Arc).await?; + pc_offer.add_track(Arc::clone(&audio_track) as Arc).await?; + + // Step 2: Perform initial negotiation + signal_pair(&mut pc_offer, &mut pc_answer).await?; + + // Wait for ICE connection + tokio::time::sleep(Duration::from_millis(500)).await; + + // Step 3: Verify initial tracks were handled + let initial_transceivers = pc_answer.get_transceivers().await; + assert_eq!( + initial_transceivers.len(), + 2, + "Should have 2 transceivers after initial negotiation" + ); + + // Verify receivers are active + for (idx, t) in initial_transceivers.iter().enumerate() { + let receiver = t.receiver().await; + assert!( + receiver.have_received().await, + "Receiver {} should be active after initial negotiation", + idx + ); + } + + // Capture initial SSRCs for verification + let mut initial_ssrcs = Vec::new(); + for t in &initial_transceivers { + let receiver = t.receiver().await; + let tracks = receiver.tracks().await; + if !tracks.is_empty() { + initial_ssrcs.push(tracks[0].ssrc()); + } + } + assert_eq!( + initial_ssrcs.len(), + 2, + "Should have captured 2 initial SSRCs" + ); + + // Step 4: Add new track to trigger renegotiation + negotiation_phase.store(1, Ordering::SeqCst); // Mark as renegotiation phase + + let new_video_track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video_new".to_owned(), + "stream_new".to_owned(), + )); + + pc_offer.add_track(new_video_track as Arc).await?; + + // Step 5: Perform renegotiation + let reoffer = pc_offer.create_offer(None).await?; + + // Verify the SDP includes existing SSRCs (per RFC 8829 Section 3.7) + let offer_sdp = reoffer.sdp.clone(); + for ssrc in &initial_ssrcs { + assert!( + offer_sdp.contains(&ssrc.to_string()), + "Renegotiation SDP should include existing SSRC {} (RFC 8829 requirement)", + ssrc + ); + } + + let mut offer_gathering_complete = pc_offer.gathering_complete_promise().await; + pc_offer.set_local_description(reoffer).await?; + let _ = offer_gathering_complete.recv().await; + + pc_answer + .set_remote_description( + pc_offer + .local_description() + .await + .ok_or(Error::new("no local description".to_owned()))?, + ) + .await?; + + let reanswer = pc_answer.create_answer(None).await?; + let mut answer_gathering_complete = pc_answer.gathering_complete_promise().await; + pc_answer.set_local_description(reanswer).await?; + let _ = answer_gathering_complete.recv().await; + + pc_offer + .set_remote_description( + pc_answer + .local_description() + .await + .ok_or(Error::new("no local description".to_owned()))?, + ) + .await?; + + // Wait for renegotiation to complete + tokio::time::sleep(Duration::from_millis(500)).await; + + // Step 6: CRITICAL ASSERTION - Verify existing receivers marked as HANDLED + // This is where the bug manifested: existing receivers were skipped and + // marked as "NOT HANDLED" despite being active. + let transceivers_after = pc_answer.get_transceivers().await; + assert_eq!( + transceivers_after.len(), + 3, + "Should have 3 transceivers after renegotiation (2 existing + 1 new)" + ); + + // Verify existing tracks still active with same SSRCs (receiver reused, not restarted) + for (idx, expected_ssrc) in initial_ssrcs.iter().enumerate() { + let receiver = transceivers_after[idx].receiver().await; + assert!( + receiver.have_received().await, + "Existing receiver {} should still be active after renegotiation", + idx + ); + + let tracks = receiver.tracks().await; + assert_eq!( + tracks.len(), + 1, + "Existing receiver {} should have 1 track", + idx + ); + assert_eq!( + tracks[0].ssrc(), + *expected_ssrc, + "SSRC should match original (receiver reused, not duplicated)" + ); + } + + // Verify new receiver also started + let new_receiver = transceivers_after[2].receiver().await; + assert!( + new_receiver.have_received().await, + "New receiver should be active" + ); + + // Cleanup + close_pair_now(&pc_offer, &pc_answer).await; + + Ok(()) +}