Skip to content

Commit b6e0f82

Browse files
committed
Add stop_transceiver/2
1 parent d9fe13a commit b6e0f82

File tree

5 files changed

+309
-48
lines changed

5 files changed

+309
-48
lines changed

lib/ex_webrtc/peer_connection.ex

Lines changed: 168 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ defmodule ExWebRTC.PeerConnection do
4141
| {:connection_state_change, connection_state()}
4242
| {:track, MediaStreamTrack.t()}
4343
| {:track_muted, MediaStreamTrack.id()}
44+
| {:track_ended, MediaStreamTrack.id()}
4445
| {:rtp, MediaStreamTrack.id(), ExRTP.Packet.t()}}
4546

4647
@typedoc """
@@ -124,6 +125,12 @@ defmodule ExWebRTC.PeerConnection do
124125
GenServer.call(peer_connection, {:set_transceiver_direction, transceiver_id, direction})
125126
end
126127

128+
@spec stop_transceiver(peer_connection(), RTPTransceiver.id()) ::
129+
:ok | {:error, :invalid_transceiver_id}
130+
def stop_transceiver(peer_connection, transceiver_id) do
131+
GenServer.call(peer_connection, {:stop_transceiver, transceiver_id})
132+
end
133+
127134
@spec add_track(peer_connection(), MediaStreamTrack.t()) :: {:ok, RTPSender.t()}
128135
def add_track(peer_connection, track) do
129136
GenServer.call(peer_connection, {:add_track, track})
@@ -206,9 +213,6 @@ defmodule ExWebRTC.PeerConnection do
206213
:ok = state.ice_transport.restart(state.ice_pid)
207214
end
208215

209-
next_mid = find_next_mid(state)
210-
transceivers = assign_mids(state.transceivers, next_mid)
211-
212216
{:ok, ice_ufrag, ice_pwd} =
213217
state.ice_transport.get_local_credentials(state.ice_pid)
214218

@@ -229,7 +233,74 @@ defmodule ExWebRTC.PeerConnection do
229233
rtcp: true
230234
]
231235

232-
mlines = Enum.map(transceivers, &RTPTransceiver.to_offer_mline(&1, opts))
236+
next_mid = find_next_mid(state)
237+
238+
{transceivers, mlines} =
239+
if state.current_local_desc == nil do
240+
# TODO stopped transceivers
241+
{transceivers, _next_mid} =
242+
Enum.map_reduce(state.transceivers, next_mid, fn %{mid: nil} = tr, nm ->
243+
if tr.stopped do
244+
{tr, nm}
245+
else
246+
tr = RTPTransceiver.assign_mid(tr, to_string(nm))
247+
tr = %RTPTransceiver{tr | mline_idx: nm}
248+
{tr, nm + 1}
249+
end
250+
end)
251+
252+
mlines =
253+
transceivers
254+
|> Enum.reject(fn tr -> tr.stopped == true end)
255+
|> Enum.map(&RTPTransceiver.to_offer_mline(&1, opts))
256+
257+
{transceivers, mlines}
258+
else
259+
last_answer = get_last_answer(state)
260+
261+
{transceivers, _next_mid} =
262+
Enum.map_reduce(state.transceivers, {next_mid, []}, fn
263+
%{mid: nil} = tr, {nm, mline_indices} ->
264+
if tr.stopped do
265+
{tr, nm}
266+
else
267+
tr = RTPTransceiver.assign_mid(tr, to_string(nm))
268+
# idx might be nil, this means that
269+
# there is no mline to recycle
270+
idx = SDPUtils.find_free_mline_idx(last_answer, mline_indices)
271+
tr = %RTPTransceiver{tr | mline_idx: idx}
272+
{tr, {nm + 1, [idx | mline_indices]}}
273+
end
274+
275+
tr, acc ->
276+
{tr, acc}
277+
end)
278+
279+
transceivers =
280+
transceivers
281+
|> Enum.reject(fn tr -> tr.stopped == true end)
282+
|> Enum.sort_by(fn tr -> tr.mline_idx end)
283+
|> Enum.with_index()
284+
|> Enum.map(fn {tr, idx} -> %RTPTransceiver{tr | mline_idx: idx} end)
285+
286+
mlines = Enum.map(transceivers, fn tr -> RTPTransceiver.to_offer_mline(tr, opts) end)
287+
288+
final_mlines =
289+
last_answer.media
290+
|> Stream.with_index()
291+
|> Enum.map(fn {answer_mline, idx} ->
292+
case Enum.at(mlines, idx) do
293+
nil -> answer_mline
294+
mline -> mline
295+
end
296+
end)
297+
298+
mlines = Enum.slice(mlines, 0..(Enum.count(final_mlines) - 1))
299+
300+
mlines = final_mlines ++ (mlines -- final_mlines)
301+
302+
{transceivers, mlines}
303+
end
233304

234305
mids =
235306
Enum.map(mlines, fn mline ->
@@ -361,6 +432,17 @@ defmodule ExWebRTC.PeerConnection do
361432
{:reply, {:ok, transceiver}, state}
362433
end
363434

435+
@impl true
436+
def handle_call({:add_transceiver, %MediaStreamTrack{} = track, options}, _from, state) do
437+
options = Keyword.put(options, :ssrc, generate_ssrc(state))
438+
transceiver = RTPTransceiver.new(track.kind, track, state.config, options)
439+
state = %{state | transceivers: state.transceivers ++ [transceiver]}
440+
441+
state = update_negotiation_needed(state)
442+
443+
{:reply, {:ok, transceiver}, state}
444+
end
445+
364446
@impl true
365447
def handle_call({:set_transceiver_direction, tr_id, direction}, _from, state) do
366448
idx = Enum.find_index(state.transceivers, fn tr -> tr.id == tr_id end)
@@ -380,14 +462,28 @@ defmodule ExWebRTC.PeerConnection do
380462
end
381463

382464
@impl true
383-
def handle_call({:add_transceiver, %MediaStreamTrack{} = track, options}, _from, state) do
384-
options = Keyword.put(options, :ssrc, generate_ssrc(state))
385-
transceiver = RTPTransceiver.new(track.kind, track, state.config, options)
386-
state = %{state | transceivers: state.transceivers ++ [transceiver]}
465+
def handle_call({:stop_transceiver, tr_id}, _from, state) do
466+
idx = Enum.find_index(state.transceivers, fn tr -> tr.id == tr_id end)
387467

388-
state = update_negotiation_needed(state)
468+
case idx do
469+
nil ->
470+
{:reply, {:error, :invalid_transceiver_id}, state}
389471

390-
{:reply, {:ok, transceiver}, state}
472+
idx ->
473+
tr = Enum.at(state.transceivers, idx)
474+
475+
if tr.stopping do
476+
{:reply, :ok, state}
477+
else
478+
# TODO send RTCP BYE for each RTP stream
479+
# TODO stop receiving media
480+
tr = %RTPTransceiver{tr | direction: :inactive, stopping: true}
481+
transceivers = List.replace_at(state.transceivers, idx, tr)
482+
state = %{state | transceivers: transceivers}
483+
state = update_negotiation_needed(state)
484+
{:reply, :ok, state}
485+
end
486+
end
391487
end
392488

393489
@impl true
@@ -638,12 +734,15 @@ defmodule ExWebRTC.PeerConnection do
638734

639735
transceivers = update_transceiver_directions(state.transceivers, sdp, :local, type)
640736

737+
# TODO re-think order of those functions
738+
# and demuxer update
641739
state =
642740
state
643741
|> set_description(:local, type, sdp)
742+
|> Map.replace!(:transceivers, transceivers)
743+
|> remove_stopped_transceivers(type, sdp)
644744
|> update_signaling_state(next_sig_state)
645745
|> Map.update!(:demuxer, &Demuxer.update(&1, sdp))
646-
|> Map.replace!(:transceivers, transceivers)
647746

648747
if state.signaling_state == :stable do
649748
state = %{state | negotiation_needed: false}
@@ -672,9 +771,7 @@ defmodule ExWebRTC.PeerConnection do
672771
state = %{state | config: config}
673772

674773
transceivers =
675-
state
676-
|> update_transceivers(sdp)
677-
|> update_transceiver_directions(sdp, :remote, type)
774+
update_transceivers(sdp.media, state.transceivers, type, state.config, state.owner)
678775

679776
# TODO: this can result in ICE restart (when it should, e.g. when this is answer)
680777
:ok = state.ice_transport.set_remote_credentials(state.ice_pid, ice_ufrag, ice_pwd)
@@ -690,9 +787,10 @@ defmodule ExWebRTC.PeerConnection do
690787
state =
691788
state
692789
|> set_description(:remote, type, sdp)
790+
|> Map.replace!(:transceivers, transceivers)
791+
|> remove_stopped_transceivers(type, sdp)
693792
|> update_signaling_state(next_sig_state)
694793
|> Map.update!(:demuxer, &Demuxer.update(&1, sdp))
695-
|> Map.replace!(:transceivers, transceivers)
696794

697795
if state.signaling_state == :stable do
698796
state = %{state | negotiation_needed: false}
@@ -710,6 +808,29 @@ defmodule ExWebRTC.PeerConnection do
710808
end
711809
end
712810

811+
defp remove_stopped_transceivers(state, :answer, sdp) do
812+
# see W3C 4.4.1.5-4.7.12 xd
813+
transceivers =
814+
Enum.reject(state.transceivers, fn tr ->
815+
if tr.mid != nil do
816+
# TODO refactor
817+
mline =
818+
Enum.find(sdp.media, fn mline ->
819+
{:mid, mid} = ExSDP.get_attribute(mline, :mid)
820+
mid == tr.mid
821+
end)
822+
823+
tr.stopped == true and mline.port == 0
824+
else
825+
false
826+
end
827+
end)
828+
829+
%{state | transceivers: transceivers}
830+
end
831+
832+
defp remove_stopped_transceivers(state, :offer, _sdp), do: state
833+
713834
defp next_signaling_state(current_signaling_state, source, type)
714835
defp next_signaling_state(:stable, :remote, :offer), do: {:ok, :have_remote_offer}
715836
defp next_signaling_state(:stable, :local, :offer), do: {:ok, :have_local_offer}
@@ -797,25 +918,39 @@ defmodule ExWebRTC.PeerConnection do
797918
end
798919

799920
# this function is only called when applying remote description
800-
defp update_transceivers(state, sdp) do
801-
Enum.reduce(sdp.media, state.transceivers, fn mline, transceivers ->
802-
{:mid, mid} = ExSDP.get_attribute(mline, :mid)
921+
defp update_transceivers(mlines, transceivers, sdp_type, config, owner)
922+
defp update_transceivers([], transceivers, _sdp_type, _config, _owner), do: transceivers
803923

804-
direction = SDPUtils.get_media_direction(mline) |> reverse_direction()
924+
defp update_transceivers([mline | mlines], transceivers, sdp_type, config, owner) do
925+
{:mid, mid} = ExSDP.get_attribute(mline, :mid)
805926

806-
# TODO: consider recycled transceivers
807-
case find_transceiver(transceivers, mid) do
808-
{idx, %RTPTransceiver{} = tr} ->
809-
new_tr = RTPTransceiver.update(tr, mline, state.config)
810-
new_tr = process_remote_track(new_tr, direction, state.owner)
811-
List.replace_at(transceivers, idx, new_tr)
927+
direction =
928+
if SDPUtils.is_rejected(mline),
929+
do: :inactive,
930+
else: SDPUtils.get_media_direction(mline) |> reverse_direction()
812931

813-
nil ->
814-
new_tr = RTPTransceiver.from_mline(mline, state.config)
815-
new_tr = process_remote_track(new_tr, direction, state.owner)
816-
transceivers ++ [new_tr]
932+
# TODO: consider recycled transceivers
933+
# Note: in theory we should update transceiver codecs
934+
# after processing remote track but this shouldn't have any impact
935+
{idx, tr} =
936+
case find_transceiver(transceivers, mid) do
937+
{idx, %RTPTransceiver{} = tr} -> {idx, RTPTransceiver.update(tr, mline, config)}
938+
nil -> {nil, RTPTransceiver.from_mline(mline, config)}
817939
end
818-
end)
940+
941+
tr = process_remote_track(tr, direction, owner)
942+
tr = if sdp_type == :answer, do: %RTPTransceiver{tr | current_direction: direction}, else: tr
943+
tr = if SDPUtils.is_rejected(mline), do: RTPTransceiver.stop(tr), else: tr
944+
945+
case idx do
946+
nil ->
947+
transceivers = transceivers ++ [tr]
948+
update_transceivers(mlines, transceivers, sdp_type, config, owner)
949+
950+
idx ->
951+
transceivers = List.replace_at(transceivers, idx, tr)
952+
update_transceivers(mlines, transceivers, sdp_type, config, owner)
953+
end
819954
end
820955

821956
# see W3C WebRTC 5.1.1
@@ -867,19 +1002,6 @@ defmodule ExWebRTC.PeerConnection do
8671002
|> Enum.find(fn {_idx, tr} -> tr.mid == mid end)
8681003
end
8691004

870-
defp assign_mids(transceivers, next_mid) do
871-
{new_transceivers, _next_mid} =
872-
Enum.map_reduce(transceivers, next_mid, fn
873-
%{mid: nil} = t, nm ->
874-
{RTPTransceiver.assign_mid(t, to_string(nm)), nm + 1}
875-
876-
other, nm ->
877-
{other, nm}
878-
end)
879-
880-
new_transceivers
881-
end
882-
8831005
defp find_next_mid(state) do
8841006
# next mid must be unique, it's acomplished by looking for values
8851007
# greater than any mid in remote description or our own transceivers
@@ -913,6 +1035,9 @@ defmodule ExWebRTC.PeerConnection do
9131035
end)
9141036
end
9151037

1038+
defp get_last_answer(%{current_local_desc: {:answer, desc}}), do: desc
1039+
defp get_last_answer(%{current_remote_desc: {:answer, desc}}), do: desc
1040+
9161041
# TODO support :disconnected state - our ICE doesn't provide disconnected state for now
9171042
# TODO support :closed state
9181043
# the order of these clauses is important

lib/ex_webrtc/rtp_transceiver.ex

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,30 @@ defmodule ExWebRTC.RTPTransceiver do
2020
@type t() :: %__MODULE__{
2121
id: id(),
2222
mid: String.t() | nil,
23+
mline_idx: non_neg_integer() | nil,
2324
direction: direction(),
2425
current_direction: direction() | nil,
2526
fired_direction: direction() | nil,
2627
kind: kind(),
2728
rtp_hdr_exts: [ExSDP.Attribute.Extmap.t()],
2829
codecs: [RTPCodecParameters.t()],
2930
receiver: RTPReceiver.t(),
30-
sender: RTPSender.t()
31+
sender: RTPSender.t(),
32+
stopping: boolean(),
33+
stopped: boolean()
3134
}
3235

3336
@enforce_keys [:id, :direction, :kind, :sender, :receiver]
3437
defstruct @enforce_keys ++
3538
[
3639
:mid,
40+
:mline_idx,
3741
:current_direction,
3842
:fired_direction,
3943
codecs: [],
40-
rtp_hdr_exts: []
44+
rtp_hdr_exts: [],
45+
stopping: false,
46+
stopped: false
4147
]
4248

4349
@doc false
@@ -115,9 +121,9 @@ defmodule ExWebRTC.RTPTransceiver do
115121
@doc false
116122
@spec to_answer_mline(t(), ExSDP.Media.t(), Keyword.t()) :: ExSDP.Media.t()
117123
def to_answer_mline(transceiver, mline, opts) do
118-
if transceiver.codecs == [] do
124+
if transceiver.codecs == [] or transceiver.stopping == true or transceiver.stopped == true do
119125
# reject mline and skip further processing
120-
# see RFC 8299 sec. 5.3.1 and RFC 3264 sec. 6
126+
# see RFC 8829 sec. 5.3.1 and RFC 3264 sec. 6
121127
%ExSDP.Media{mline | port: 0}
122128
else
123129
offered_direction = SDPUtils.get_media_direction(mline)
@@ -130,7 +136,9 @@ defmodule ExWebRTC.RTPTransceiver do
130136
@doc false
131137
@spec to_offer_mline(t(), Keyword.t()) :: ExSDP.Media.t()
132138
def to_offer_mline(transceiver, opts) do
133-
to_mline(transceiver, opts)
139+
# TODO refactor
140+
mline = to_mline(transceiver, opts)
141+
if transceiver.stopping, do: %ExSDP.Media{mline | port: 0}, else: mline
134142
end
135143

136144
@doc false
@@ -141,6 +149,20 @@ defmodule ExWebRTC.RTPTransceiver do
141149
%{transceiver | mid: mid, sender: sender}
142150
end
143151

152+
@spec stop(t()) :: t()
153+
def stop(transceiver) do
154+
tr = if transceiver.stopping, do: transceiver, else: stop_sending_and_receiving(transceiver)
155+
# should we reset stopping or leave it as true?
156+
%__MODULE__{tr | stopped: true, stopping: false, current_direction: nil}
157+
end
158+
159+
@spec stop_sending_and_receiving(t()) :: t()
160+
def stop_sending_and_receiving(transceiver) do
161+
# TODO send RTCP BYE for each RTP stream
162+
# TODO stop receiving media
163+
%__MODULE__{transceiver | direction: :inactive, stopping: true}
164+
end
165+
144166
defp to_mline(transceiver, opts) do
145167
pt = Enum.map(transceiver.codecs, fn codec -> codec.payload_type end)
146168

0 commit comments

Comments
 (0)