diff --git a/Cargo.lock b/Cargo.lock index ae9b3f0ff..ccd13ff55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 4 [[package]] name = "addr2line" -version = "0.24.2" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" dependencies = [ "gimli", ] @@ -128,9 +128,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.99" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "arc-swap" @@ -235,9 +235,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "backtrace" -version = "0.3.75" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" dependencies = [ "addr2line", "cfg-if", @@ -245,7 +245,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-targets 0.52.6", + "windows-link", ] [[package]] @@ -340,9 +340,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.37" +version = "1.2.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65193589c6404eb80b450d618eaf9a2cafaaafd57ecce47370519ef674a7bd44" +checksum = "e1354349954c6fc9cb0deab020f27f783cf0b604e8bb754dc4658ecf0d29c35f" dependencies = [ "find-msvc-tools", "shlex", @@ -399,8 +399,9 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", - "windows-link 0.2.0", + "windows-link", ] [[package]] @@ -458,18 +459,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.47" +version = "4.5.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931" +checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.47" +version = "4.5.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6" +checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" dependencies = [ "anstyle", "clap_lex 0.7.5", @@ -541,7 +542,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.5.47", + "clap 4.5.48", "criterion-plot", "futures", "is-terminal", @@ -691,9 +692,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc" +checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" dependencies = [ "powerfmt", ] @@ -866,9 +867,9 @@ checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" [[package]] name = "find-msvc-tools" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fd99930f64d146689264c637b5af2f0233a933bef0d8570e2526bf9e083192d" +checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" [[package]] name = "fnv" @@ -1020,7 +1021,7 @@ dependencies = [ "cfg-if", "libc", "r-efi", - "wasi 0.14.5+wasi-0.2.4", + "wasi 0.14.7+wasi-0.2.4", ] [[package]] @@ -1035,9 +1036,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.31.1" +version = "0.32.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" [[package]] name = "group" @@ -1062,7 +1063,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.11.1", + "indexmap 2.11.4", "slab", "tokio", "tokio-util", @@ -1087,9 +1088,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.15.5" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" [[package]] name = "hermit-abi" @@ -1331,12 +1332,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.11.1" +version = "2.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "206a8042aec68fa4a62e8d3f7aa4ceb508177d9324faf261e1959e495b7a1921" +checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" dependencies = [ "equivalent", - "hashbrown 0.15.5", + "hashbrown 0.16.0", ] [[package]] @@ -1444,9 +1445,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.78" +version = "0.3.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c0b063578492ceec17683ef2f8c5e89121fbd0b172cbc280635ab7567db2738" +checksum = "ec48937a97411dcb524a265206ccd4c90bb711fca92b2792c407f268825b9305" dependencies = [ "once_cell", "wasm-bindgen", @@ -1460,9 +1461,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.175" +version = "0.2.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" [[package]] name = "litemap" @@ -1498,9 +1499,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.5" +version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" [[package]] name = "memoffset" @@ -1602,9 +1603,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.7" +version = "0.37.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" dependencies = [ "memchr", ] @@ -1906,9 +1907,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.40" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" dependencies = [ "proc-macro2", ] @@ -2002,9 +2003,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.2" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912" +checksum = "8b5288124840bee7b386bc413c487869b360b2b4ec421ea56425128692f2a82c" dependencies = [ "aho-corasick", "memchr", @@ -2014,9 +2015,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6" +checksum = "833eb9ce86d40ef33cb1306d8accf7bc8ec2bfea4355cbdebb3df68b40925cad" dependencies = [ "aho-corasick", "memchr", @@ -2103,9 +2104,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.31" +version = "0.23.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0ebcbd2f03de0fc1122ad9bb24b127a5a6cd51d72604a3f3c50ac459762b6cc" +checksum = "cd3c25631629d034ce7cd9940adc9d45762d46de2b0f57193c4443b92c6d4d40" dependencies = [ "once_cell", "ring", @@ -2126,9 +2127,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.5" +version = "0.103.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5a37813727b78798e53c2bec3f5e8fe12a6d6f8389bf9ca7802add4c9905ad8" +checksum = "8572f3c2cb9934231157b45499fc41e1f58c589fdfb81a844ba873265e80f8eb" dependencies = [ "ring", "rustls-pki-types", @@ -2189,24 +2190,34 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.26" +version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -2215,14 +2226,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.143" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ "itoa", "memchr", "ryu", "serde", + "serde_core", ] [[package]] @@ -2428,11 +2440,12 @@ dependencies = [ [[package]] name = "time" -version = "0.3.43" +version = "0.3.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83bde6f1ec10e72d583d91623c939f623002284ef622b87de38cfd546cbf2031" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde", @@ -2715,27 +2728,27 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasi" -version = "0.14.5+wasi-0.2.4" +version = "0.14.7+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4494f6290a82f5fe584817a676a34b9d6763e8d9d18204009fb31dceca98fd4" +checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" dependencies = [ "wasip2", ] [[package]] name = "wasip2" -version = "1.0.0+wasi-0.2.4" +version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03fa2761397e5bd52002cd7e73110c71af2109aca4e521a9f40473fe685b0a24" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ "wit-bindgen", ] [[package]] name = "wasm-bindgen" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e14915cadd45b529bb8d1f343c4ed0ac1de926144b746e2710f9cd05df6603b" +checksum = "c1da10c01ae9f1ae40cbfac0bac3b1e724b320abfcf52229f80b547c0d250e2d" dependencies = [ "cfg-if", "once_cell", @@ -2746,9 +2759,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28d1ba982ca7923fd01448d5c30c6864d0a14109560296a162f80f305fb93bb" +checksum = "671c9a5a66f49d8a47345ab942e2cb93c7d1d0339065d4f8139c486121b43b19" dependencies = [ "bumpalo", "log", @@ -2760,9 +2773,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c3d463ae3eff775b0c45df9da45d68837702ac35af998361e2c84e7c5ec1b0d" +checksum = "7ca60477e4c59f5f2986c50191cd972e3a50d8a95603bc9434501cf156a9a119" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2770,9 +2783,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bb4ce89b08211f923caf51d527662b75bdc9c9c7aab40f86dcb9fb85ac552aa" +checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7" dependencies = [ "proc-macro2", "quote", @@ -2783,18 +2796,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f143854a3b13752c6950862c906306adb27c7e839f7414cec8fea35beab624c1" +checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1" dependencies = [ "unicode-ident", ] [[package]] name = "web-sys" -version = "0.3.78" +version = "0.3.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77e4b637749ff0d92b8fad63aa1f7cff3cbe125fd49c175cd6345e7272638b12" +checksum = "9367c417a924a74cae129e6a2ae3b47fabb1f8995595ab474029da749a8be120" dependencies = [ "js-sys", "wasm-bindgen", @@ -2807,6 +2820,7 @@ dependencies = [ "arc-swap", "async-trait", "bytes", + "chrono", "dtls", "env_logger", "hex", @@ -3014,7 +3028,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.0", + "windows-sys 0.61.1", ] [[package]] @@ -3025,22 +3039,22 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-core" -version = "0.62.0" +version = "0.62.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57fe7168f7de578d2d8a05b07fd61870d2e73b4020e9f49aa00da8471723497c" +checksum = "6844ee5416b285084d3d3fffd743b925a6c9385455f64f6d4fa3031c4c2749a9" dependencies = [ "windows-implement", "windows-interface", - "windows-link 0.2.0", + "windows-link", "windows-result", "windows-strings", ] [[package]] name = "windows-implement" -version = "0.60.0" +version = "0.60.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +checksum = "edb307e42a74fb6de9bf3a02d9712678b22399c87e6fa869d6dfcd8c1b7754e0" dependencies = [ "proc-macro2", "quote", @@ -3049,21 +3063,15 @@ dependencies = [ [[package]] name = "windows-interface" -version = "0.59.1" +version = "0.59.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +checksum = "c0abd1ddbc6964ac14db11c7213d6532ef34bd9aa042c2e5935f59d7908b46a5" dependencies = [ "proc-macro2", "quote", "syn", ] -[[package]] -name = "windows-link" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" - [[package]] name = "windows-link" version = "0.2.0" @@ -3076,7 +3084,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7084dcc306f89883455a206237404d3eaf961e5bd7e0f312f7c91f57eb44167f" dependencies = [ - "windows-link 0.2.0", + "windows-link", ] [[package]] @@ -3085,7 +3093,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7218c655a553b0bed4426cf54b20d7ba363ef543b52d515b3e48d7fd55318dda" dependencies = [ - "windows-link 0.2.0", + "windows-link", ] [[package]] @@ -3112,16 +3120,16 @@ version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" dependencies = [ - "windows-targets 0.53.3", + "windows-targets 0.53.4", ] [[package]] name = "windows-sys" -version = "0.61.0" +version = "0.61.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e201184e40b2ede64bc2ea34968b28e33622acdbbf37104f0e4a33f7abe657aa" +checksum = "6f109e41dd4a3c848907eb83d5a42ea98b3769495597450cf6d153507b166f0f" dependencies = [ - "windows-link 0.2.0", + "windows-link", ] [[package]] @@ -3142,11 +3150,11 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.53.3" +version = "0.53.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" +checksum = "2d42b7b7f66d2a06854650af09cfdf8713e427a439c97ad65a6375318033ac4b" dependencies = [ - "windows-link 0.1.3", + "windows-link", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", @@ -3255,9 +3263,9 @@ checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" [[package]] name = "wit-bindgen" -version = "0.45.1" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c573471f125075647d03df72e026074b7203790d41351cd6edc96f46bcccd36" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" [[package]] name = "writeable" diff --git a/Cargo.toml b/Cargo.toml index 79ae8baf0..f38c32a89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,8 +19,5 @@ members = [ ] resolver = "2" -[workspace.dependencies] -tokio-test = { version = "0.4.4" } - [profile.dev] opt-level = 0 diff --git a/ice/src/candidate/candidate_base.rs b/ice/src/candidate/candidate_base.rs index 08125672c..8cc34d018 100644 --- a/ice/src/candidate/candidate_base.rs +++ b/ice/src/candidate/candidate_base.rs @@ -240,7 +240,9 @@ impl Candidate for CandidateBase { { let mut closed_ch = self.closed_ch.lock().await; if closed_ch.is_none() { - return Err(Error::ErrClosed); + // Если кандидат уже был ранее закрыт, не возвращать ошибку, а просто вернуть успех + return Ok(()); + // return Err(Error::ErrClosed); } closed_ch.take(); } diff --git a/interceptor/src/lib.rs b/interceptor/src/lib.rs index 98008d405..55dda4130 100644 --- a/interceptor/src/lib.rs +++ b/interceptor/src/lib.rs @@ -79,14 +79,11 @@ pub trait Interceptor { #[async_trait] pub trait RTPWriter { /// write a rtp packet - async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result; + async fn write(&self, pkt: &rtp::packet::Packet) -> Result; } pub type RTPWriterBoxFn = Box< - dyn (Fn( - &rtp::packet::Packet, - &Attributes, - ) -> Pin> + Send + Sync>>) + dyn (Fn(&rtp::packet::Packet) -> Pin> + Send + Sync>>) + Send + Sync, >; @@ -95,8 +92,8 @@ pub struct RTPWriterFn(pub RTPWriterBoxFn); #[async_trait] impl RTPWriter for RTPWriterFn { /// write a rtp packet - async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result { - self.0(pkt, attributes).await + async fn write(&self, pkt: &rtp::packet::Packet) -> Result { + self.0(pkt).await } } @@ -104,19 +101,11 @@ impl RTPWriter for RTPWriterFn { #[async_trait] pub trait RTPReader { /// read a rtp packet - async fn read( - &self, - buf: &mut [u8], - attributes: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)>; + async fn read(&self, buf: &mut [u8]) -> Result; } pub type RTPReaderBoxFn = Box< - dyn (Fn( - &mut [u8], - &Attributes, - ) - -> Pin> + Send + Sync>>) + dyn (Fn(&mut [u8]) -> Pin> + Send + Sync>>) + Send + Sync, >; @@ -125,12 +114,8 @@ pub struct RTPReaderFn(pub RTPReaderBoxFn); #[async_trait] impl RTPReader for RTPReaderFn { /// read a rtp packet - async fn read( - &self, - buf: &mut [u8], - attributes: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { - self.0(buf, attributes).await + async fn read(&self, buf: &mut [u8]) -> Result { + self.0(buf).await } } @@ -138,17 +123,12 @@ impl RTPReader for RTPReaderFn { #[async_trait] pub trait RTCPWriter { /// write a batch of rtcp packets - async fn write( - &self, - pkts: &[Box], - attributes: &Attributes, - ) -> Result; + async fn write(&self, pkts: &[Box]) -> Result; } pub type RTCPWriterBoxFn = Box< dyn (Fn( &[Box], - &Attributes, ) -> Pin> + Send + Sync>>) + Send + Sync, @@ -159,12 +139,8 @@ pub struct RTCPWriterFn(pub RTCPWriterBoxFn); #[async_trait] impl RTCPWriter for RTCPWriterFn { /// write a batch of rtcp packets - async fn write( - &self, - pkts: &[Box], - attributes: &Attributes, - ) -> Result { - self.0(pkts, attributes).await + async fn write(&self, pkts: &[Box]) -> Result { + self.0(pkts).await } } @@ -175,22 +151,16 @@ pub trait RTCPReader { async fn read( &self, buf: &mut [u8], - attributes: &Attributes, - ) -> Result<(Vec>, Attributes)>; + ) -> Result>>; } pub type RTCPReaderBoxFn = Box< dyn (Fn( &mut [u8], - &Attributes, ) -> Pin< Box< - dyn Future< - Output = Result<( - Vec>, - Attributes, - )>, - > + Send + dyn Future>>> + + Send + Sync, >, >) + Send @@ -205,9 +175,8 @@ impl RTCPReader for RTCPReaderFn { async fn read( &self, buf: &mut [u8], - attributes: &Attributes, - ) -> Result<(Vec>, Attributes)> { - self.0(buf, attributes).await + ) -> Result>> { + self.0(buf).await } } diff --git a/interceptor/src/mock/mock_stream.rs b/interceptor/src/mock/mock_stream.rs index 436597e9d..b001d48a5 100644 --- a/interceptor/src/mock/mock_stream.rs +++ b/interceptor/src/mock/mock_stream.rs @@ -6,7 +6,7 @@ use util::Marshal; use crate::error::{Error, Result}; use crate::stream_info::StreamInfo; -use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; +use crate::{Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; type RTCPPackets = Vec>; @@ -91,10 +91,9 @@ impl MockStream { .await; tokio::spawn(async move { let mut buf = vec![0u8; 1500]; - let a = Attributes::new(); loop { - let pkts = match rtcp_reader.read(&mut buf, &a).await { - Ok((n, _)) => n, + let pkts = match rtcp_reader.read(&mut buf).await { + Ok(n) => n, Err(err) => { let _ = rtcp_in_modified_tx.send(Err(err)).await; break; @@ -113,10 +112,9 @@ impl MockStream { .await; tokio::spawn(async move { let mut buf = vec![0u8; 1500]; - let a = Attributes::new(); loop { - let pkt = match rtp_reader.read(&mut buf, &a).await { - Ok((pkt, _)) => pkt, + let pkt = match rtp_reader.read(&mut buf).await { + Ok(pkt) => pkt, Err(err) => { let _ = rtp_in_modified_tx.send(Err(err)).await; break; @@ -135,10 +133,9 @@ impl MockStream { &self, pkt: &[Box], ) -> Result { - let a = Attributes::new(); let rtcp_writer = self.rtcp_writer.lock().await; if let Some(writer) = &*rtcp_writer { - writer.write(pkt, &a).await + writer.write(pkt).await } else { Err(Error::Other("invalid rtcp_writer".to_owned())) } @@ -146,10 +143,9 @@ impl MockStream { /// write_rtp writes an rtp packet to the stream, using the interceptor pub async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result { - let a = Attributes::new(); let rtp_writer = self.rtp_writer.lock().await; if let Some(writer) = &*rtp_writer { - writer.write(pkt, &a).await + writer.write(pkt).await } else { Err(Error::Other("invalid rtp_writer".to_owned())) } @@ -229,11 +225,7 @@ impl MockStream { #[async_trait] impl RTCPWriter for MockStreamInternal { - async fn write( - &self, - pkts: &[Box], - _attributes: &Attributes, - ) -> Result { + async fn write(&self, pkts: &[Box]) -> Result { let _ = self.rtcp_out_modified_tx.send(pkts.to_vec()).await; Ok(0) @@ -245,8 +237,7 @@ impl RTCPReader for MockStreamInternal { async fn read( &self, buf: &mut [u8], - a: &Attributes, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { let pkts = { let mut rtcp_in = self.rtcp_in_rx.lock().await; rtcp_in.recv().await.ok_or(Error::ErrIoEOF)? @@ -259,13 +250,13 @@ impl RTCPReader for MockStreamInternal { } buf[..n].copy_from_slice(&marshaled); - Ok((pkts, a.clone())) + Ok(pkts) } } #[async_trait] impl RTPWriter for MockStreamInternal { - async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result { + async fn write(&self, pkt: &rtp::packet::Packet) -> Result { let _ = self.rtp_out_modified_tx.send(pkt.clone()).await; Ok(0) } @@ -273,11 +264,7 @@ impl RTPWriter for MockStreamInternal { #[async_trait] impl RTPReader for MockStreamInternal { - async fn read( - &self, - buf: &mut [u8], - a: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { + async fn read(&self, buf: &mut [u8]) -> Result { let pkt = { let mut rtp_in = self.rtp_in_rx.lock().await; rtp_in.recv().await.ok_or(Error::ErrIoEOF)? @@ -290,7 +277,7 @@ impl RTPReader for MockStreamInternal { } buf[..n].copy_from_slice(&marshaled); - Ok((pkt, a.clone())) + Ok(pkt) } } diff --git a/interceptor/src/nack/generator/generator_stream.rs b/interceptor/src/nack/generator/generator_stream.rs index 8cd5a108c..ce8930e83 100644 --- a/interceptor/src/nack/generator/generator_stream.rs +++ b/interceptor/src/nack/generator/generator_stream.rs @@ -150,16 +150,12 @@ impl GeneratorStream { #[async_trait] impl RTPReader for GeneratorStream { /// read a rtp packet - async fn read( - &self, - buf: &mut [u8], - a: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { - let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?; + async fn read(&self, buf: &mut [u8]) -> Result { + let pkt = self.parent_rtp_reader.read(buf).await?; self.add(pkt.header.sequence_number); - Ok((pkt, attr)) + Ok(pkt) } } diff --git a/interceptor/src/nack/generator/mod.rs b/interceptor/src/nack/generator/mod.rs index 16b665879..f1942c14e 100644 --- a/interceptor/src/nack/generator/mod.rs +++ b/interceptor/src/nack/generator/mod.rs @@ -17,9 +17,7 @@ use waitgroup::WaitGroup; use crate::error::{Error, Result}; use crate::nack::stream_support_nack; use crate::stream_info::StreamInfo; -use crate::{ - Attributes, Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter, -}; +use crate::{Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; /// GeneratorBuilder can be used to configure Generator Interceptor #[derive(Default)] @@ -136,9 +134,8 @@ impl Generator { nacks }; - let a = Attributes::new(); for nack in nacks{ - if let Err(err) = rtcp_writer.write(&[Box::new(nack)], &a).await{ + if let Err(err) = rtcp_writer.write(&[Box::new(nack)]).await{ log::warn!("failed sending nack: {err}"); } } diff --git a/interceptor/src/nack/responder/mod.rs b/interceptor/src/nack/responder/mod.rs index fa54a694a..80f08e779 100644 --- a/interceptor/src/nack/responder/mod.rs +++ b/interceptor/src/nack/responder/mod.rs @@ -15,9 +15,7 @@ use tokio::sync::Mutex; use crate::error::Result; use crate::nack::stream_support_nack; use crate::stream_info::StreamInfo; -use crate::{ - Attributes, Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter, -}; +use crate::{Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; /// GeneratorBuilder can be used to configure Responder Interceptor #[derive(Default)] @@ -73,8 +71,7 @@ impl ResponderInternal { let stream3 = Arc::clone(&stream2); Box::pin(async move { if let Some(p) = stream3.get(seq).await { - let a = Attributes::new(); - if let Err(err) = stream3.next_rtp_writer.write(&p, &a).await { + if let Err(err) = stream3.next_rtp_writer.write(&p).await { log::warn!("failed resending nacked packet: {err}"); } } @@ -101,9 +98,8 @@ impl RTCPReader for ResponderRtcpReader { async fn read( &self, buf: &mut [u8], - a: &Attributes, - ) -> Result<(Vec>, Attributes)> { - let (pkts, attr) = { self.parent_rtcp_reader.read(buf, a).await? }; + ) -> Result>> { + let pkts = { self.parent_rtcp_reader.read(buf).await? }; for p in &pkts { if let Some(nack) = p.as_any().downcast_ref::() { let nack = nack.clone(); @@ -114,7 +110,7 @@ impl RTCPReader for ResponderRtcpReader { } } - Ok((pkts, attr)) + Ok(pkts) } } diff --git a/interceptor/src/nack/responder/responder_stream.rs b/interceptor/src/nack/responder/responder_stream.rs index ec714da6d..9ad6cb50f 100644 --- a/interceptor/src/nack/responder/responder_stream.rs +++ b/interceptor/src/nack/responder/responder_stream.rs @@ -5,7 +5,7 @@ use tokio::sync::Mutex; use crate::error::Result; use crate::nack::UINT16SIZE_HALF; -use crate::{Attributes, RTPWriter}; +use crate::RTPWriter; struct ResponderStreamInternal { packets: Vec>, @@ -90,10 +90,10 @@ impl ResponderStream { #[async_trait] impl RTPWriter for ResponderStream { /// write a rtp packet - async fn write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result { + async fn write(&self, pkt: &rtp::packet::Packet) -> Result { self.add(pkt).await; - self.next_rtp_writer.write(pkt, a).await + self.next_rtp_writer.write(pkt).await } } diff --git a/interceptor/src/noop.rs b/interceptor/src/noop.rs index 597c57a8f..c6ff1b216 100644 --- a/interceptor/src/noop.rs +++ b/interceptor/src/noop.rs @@ -59,12 +59,8 @@ impl Interceptor for NoOp { #[async_trait] impl RTPReader for NoOp { - async fn read( - &self, - _buf: &mut [u8], - a: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { - Ok((rtp::packet::Packet::default(), a.clone())) + async fn read(&self, _buf: &mut [u8]) -> Result { + Ok(rtp::packet::Packet::default()) } } @@ -73,8 +69,7 @@ impl RTCPReader for NoOp { async fn read( &self, _buf: &mut [u8], - a: &Attributes, - ) -> Result<(Vec>, Attributes)> { - Ok((vec![], a.clone())) + ) -> Result>> { + Ok(vec![]) } } diff --git a/interceptor/src/report/receiver/mod.rs b/interceptor/src/report/receiver/mod.rs index 5a5e21579..fd80227c8 100644 --- a/interceptor/src/report/receiver/mod.rs +++ b/interceptor/src/report/receiver/mod.rs @@ -30,9 +30,8 @@ impl RTCPReader for ReceiverReportRtcpReader { async fn read( &self, buf: &mut [u8], - a: &Attributes, - ) -> Result<(Vec>, Attributes)> { - let (pkts, attr) = self.parent_rtcp_reader.read(buf, a).await?; + ) -> Result>> { + let pkts = self.parent_rtcp_reader.read(buf).await?; let now = if let Some(f) = &self.internal.now { f() @@ -55,7 +54,7 @@ impl RTCPReader for ReceiverReportRtcpReader { } } - Ok((pkts, attr)) + Ok(pkts) } } @@ -112,8 +111,7 @@ impl ReceiverReport { for stream in streams { let pkt = stream.generate_report(now); - let a = Attributes::new(); - if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{ + if let Err(err) = rtcp_writer.write(&[Box::new(pkt)]).await{ log::warn!("failed sending: {err}"); } } diff --git a/interceptor/src/report/receiver/receiver_stream.rs b/interceptor/src/report/receiver/receiver_stream.rs index d170922e8..741da9258 100644 --- a/interceptor/src/report/receiver/receiver_stream.rs +++ b/interceptor/src/report/receiver/receiver_stream.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use util::sync::Mutex; use super::*; -use crate::{Attributes, RTPReader}; +use crate::RTPReader; struct ReceiverStreamInternal { ssrc: u32, @@ -208,12 +208,8 @@ impl ReceiverStream { #[async_trait] impl RTPReader for ReceiverStream { /// read a rtp packet - async fn read( - &self, - buf: &mut [u8], - a: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { - let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?; + async fn read(&self, buf: &mut [u8]) -> Result { + let pkt = self.parent_rtp_reader.read(buf).await?; let now = if let Some(f) = &self.now { f() @@ -222,6 +218,6 @@ impl RTPReader for ReceiverStream { }; self.process_rtp(now, &pkt); - Ok((pkt, attr)) + Ok(pkt) } } diff --git a/interceptor/src/report/sender/mod.rs b/interceptor/src/report/sender/mod.rs index f46c54933..94c7de630 100644 --- a/interceptor/src/report/sender/mod.rs +++ b/interceptor/src/report/sender/mod.rs @@ -72,8 +72,7 @@ impl SenderReport { for stream in streams { let pkt = stream.generate_report(now).await; - let a = Attributes::new(); - if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{ + if let Err(err) = rtcp_writer.write(&[Box::new(pkt)]).await{ log::warn!("failed sending: {err}"); } } diff --git a/interceptor/src/report/sender/sender_stream.rs b/interceptor/src/report/sender/sender_stream.rs index df0602afa..446ae40fc 100644 --- a/interceptor/src/report/sender/sender_stream.rs +++ b/interceptor/src/report/sender/sender_stream.rs @@ -7,7 +7,7 @@ use rtp::extension::abs_send_time_extension::unix2ntp; use tokio::sync::Mutex; use super::*; -use crate::{Attributes, RTPWriter}; +use crate::RTPWriter; struct SenderStreamInternal { ssrc: u32, @@ -92,7 +92,7 @@ impl SenderStream { #[async_trait] impl RTPWriter for SenderStream { /// write a rtp packet - async fn write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result { + async fn write(&self, pkt: &rtp::packet::Packet) -> Result { let now = if let Some(f) = &self.now { f() } else { @@ -100,7 +100,7 @@ impl RTPWriter for SenderStream { }; self.process_rtp(now, pkt).await; - self.next_rtp_writer.write(pkt, a).await + self.next_rtp_writer.write(pkt).await } } diff --git a/interceptor/src/stats/interceptor.rs b/interceptor/src/stats/interceptor.rs index a80af97ef..df2cbcef0 100644 --- a/interceptor/src/stats/interceptor.rs +++ b/interceptor/src/stats/interceptor.rs @@ -19,7 +19,7 @@ use util::MarshalSize; use super::{inbound, outbound, StatsContainer}; use crate::error::Result; use crate::stream_info::StreamInfo; -use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; +use crate::{Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; #[derive(Debug)] enum Message { @@ -390,9 +390,8 @@ where async fn read( &self, buf: &mut [u8], - attributes: &Attributes, - ) -> Result<(Vec>, Attributes)> { - let (pkts, attributes) = self.rtcp_reader.read(buf, attributes).await?; + ) -> Result>> { + let pkts = self.rtcp_reader.read(buf).await?; // Middle 32 bits let now = (unix2ntp((self.now_gen)()) >> 16) as u32; @@ -595,7 +594,7 @@ where futures::future::join_all(futures).await; } - Ok((pkts, attributes)) + Ok(pkts) } } @@ -610,11 +609,7 @@ impl RTCPWriter for RTCPWriteInterceptor where F: Fn() -> SystemTime + Send + Sync, { - async fn write( - &self, - pkts: &[Box], - attributes: &Attributes, - ) -> Result { + async fn write(&self, pkts: &[Box]) -> Result { #[derive(Default, Debug)] struct Entry { fir_count: Option, @@ -691,7 +686,7 @@ where } } - self.rtcp_writer.write(pkts, attributes).await + self.rtcp_writer.write(pkts).await } } @@ -714,12 +709,8 @@ impl fmt::Debug for RTPReadRecorder { #[async_trait] impl RTPReader for RTPReadRecorder { - async fn read( - &self, - buf: &mut [u8], - attributes: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { - let (pkt, attributes) = self.rtp_reader.read(buf, attributes).await?; + async fn read(&self, buf: &mut [u8]) -> Result { + let pkt = self.rtp_reader.read(buf).await?; let _ = self .tx @@ -734,7 +725,7 @@ impl RTPReader for RTPReadRecorder { }) .await; - Ok((pkt, attributes)) + Ok(pkt) } } @@ -758,8 +749,8 @@ impl fmt::Debug for RTPWriteRecorder { #[async_trait] impl RTPWriter for RTPWriteRecorder { /// write a rtp packet - async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result { - let n = self.rtp_writer.write(pkt, attributes).await?; + async fn write(&self, pkt: &rtp::packet::Packet) -> Result { + let n = self.rtp_writer.write(pkt).await?; let _ = self .tx diff --git a/interceptor/src/stream_reader.rs b/interceptor/src/stream_reader.rs index 4f4578631..80882bb73 100644 --- a/interceptor/src/stream_reader.rs +++ b/interceptor/src/stream_reader.rs @@ -2,16 +2,12 @@ use async_trait::async_trait; use srtp::stream::Stream; use crate::error::Result; -use crate::{Attributes, RTCPReader, RTPReader}; +use crate::{RTCPReader, RTPReader}; #[async_trait] impl RTPReader for Stream { - async fn read( - &self, - buf: &mut [u8], - a: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { - Ok((self.read_rtp(buf).await?, a.clone())) + async fn read(&self, buf: &mut [u8]) -> Result { + Ok(self.read_rtp(buf).await?) } } @@ -20,8 +16,7 @@ impl RTCPReader for Stream { async fn read( &self, buf: &mut [u8], - a: &Attributes, - ) -> Result<(Vec>, Attributes)> { - Ok((self.read_rtcp(buf).await?, a.clone())) + ) -> Result>> { + Ok(self.read_rtcp(buf).await?) } } diff --git a/interceptor/src/twcc/receiver/mod.rs b/interceptor/src/twcc/receiver/mod.rs index 3126122e8..be8abb05b 100644 --- a/interceptor/src/twcc/receiver/mod.rs +++ b/interceptor/src/twcc/receiver/mod.rs @@ -113,7 +113,6 @@ impl Receiver { } }; - let a = Attributes::new(); let mut ticker = tokio::time::interval(internal.interval); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { @@ -138,7 +137,7 @@ impl Receiver { continue; } - if let Err(err) = rtcp_writer.write(&pkts, &a).await{ + if let Err(err) = rtcp_writer.write(&pkts).await{ log::error!("rtcp_writer.write got err: {err}"); } } diff --git a/interceptor/src/twcc/receiver/receiver_stream.rs b/interceptor/src/twcc/receiver/receiver_stream.rs index 764b26c9d..0df8ce14f 100644 --- a/interceptor/src/twcc/receiver/receiver_stream.rs +++ b/interceptor/src/twcc/receiver/receiver_stream.rs @@ -30,12 +30,8 @@ impl ReceiverStream { #[async_trait] impl RTPReader for ReceiverStream { /// read a rtp packet - async fn read( - &self, - buf: &mut [u8], - attributes: &Attributes, - ) -> Result<(rtp::packet::Packet, Attributes)> { - let (pkt, attr) = self.parent_rtp_reader.read(buf, attributes).await?; + async fn read(&self, buf: &mut [u8]) -> Result { + let pkt = self.parent_rtp_reader.read(buf).await?; if let Some(mut ext) = pkt.header.get_extension(self.hdr_ext_id) { let tcc_ext = TransportCcExtension::unmarshal(&mut ext)?; @@ -52,6 +48,6 @@ impl RTPReader for ReceiverStream { .await; } - Ok((pkt, attr)) + Ok(pkt) } } diff --git a/interceptor/src/twcc/sender/mod.rs b/interceptor/src/twcc/sender/mod.rs index d3ed5673d..d9afd4fd2 100644 --- a/interceptor/src/twcc/sender/mod.rs +++ b/interceptor/src/twcc/sender/mod.rs @@ -11,7 +11,7 @@ use sender_stream::SenderStream; use tokio::sync::Mutex; use util::Marshal; -use crate::{Attributes, RTPWriter, *}; +use crate::{RTPWriter, *}; pub(crate) const TRANSPORT_CC_URI: &str = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; diff --git a/interceptor/src/twcc/sender/sender_stream.rs b/interceptor/src/twcc/sender/sender_stream.rs index 29754070d..132a18bdf 100644 --- a/interceptor/src/twcc/sender/sender_stream.rs +++ b/interceptor/src/twcc/sender/sender_stream.rs @@ -24,7 +24,7 @@ impl SenderStream { #[async_trait] impl RTPWriter for SenderStream { /// write a rtp packet - async fn write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result { + async fn write(&self, pkt: &rtp::packet::Packet) -> Result { let sequence_number = self.next_sequence_nr.fetch_add(1, Ordering::SeqCst); let tcc_ext = TransportCcExtension { @@ -35,6 +35,6 @@ impl RTPWriter for SenderStream { let mut pkt = pkt.clone(); pkt.header.set_extension(self.hdr_ext_id, tcc_payload)?; - self.next_rtp_writer.write(&pkt, a).await + self.next_rtp_writer.write(&pkt).await } } diff --git a/sdp/src/description/media.rs b/sdp/src/description/media.rs index 47aa4905a..7fd0b35cd 100644 --- a/sdp/src/description/media.rs +++ b/sdp/src/description/media.rs @@ -80,6 +80,23 @@ impl MediaDescription { None } + pub fn attributes(&self, key: &str) -> Vec { + let mut atrs = vec![]; + for a in &self.attributes { + if a.key == key { + match a.value.as_ref().map(|s| s.to_owned()) { + Some(atr_value) => { + atrs.push(atr_value); + } + None => { + atrs.push(String::new()); + } + } + } + } + atrs + } + /// new_jsep_media_description creates a new MediaName with /// some settings that are required by the JSEP spec. pub fn new_jsep_media_description(codec_type: String, _codec_prefs: Vec<&str>) -> Self { diff --git a/util/Cargo.toml b/util/Cargo.toml index a68cc715d..aad744375 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -54,7 +54,7 @@ winapi = { version = "0.3.9", features = [ ] } [dev-dependencies] -tokio-test.workspace = true +tokio-test = { version = "0.4.4" } env_logger = "0.11.3" chrono = "0.4.28" criterion = { version = "0.5", features = ["async_futures"] } diff --git a/webrtc/Cargo.toml b/webrtc/Cargo.toml index 93f5ce05a..e9930e7ef 100644 --- a/webrtc/Cargo.toml +++ b/webrtc/Cargo.toml @@ -11,6 +11,7 @@ repository = "https://github.com/webrtc-rs/webrtc" readme = "../README.md" [dependencies] +chrono = { version = "0.4.40", features = ["serde"] } data = { version = "0.12.0", path = "../data", package = "webrtc-data" } dtls = { version = "0.13.0", path = "../dtls" } ice = { version = "0.14.0", path = "../ice", package = "webrtc-ice" } diff --git a/webrtc/src/error.rs b/webrtc/src/error.rs index 22b712bfb..039f0c916 100644 --- a/webrtc/src/error.rs +++ b/webrtc/src/error.rs @@ -389,6 +389,9 @@ pub enum Error { #[error("SCTP is not established")] ErrSCTPNotEstablished, + #[error("LocalTrack binding not found")] + LocalTrackBindingNotFound, + #[error("DataChannel is not opened")] ErrClosedPipe, #[error("Interceptor is not bind")] diff --git a/webrtc/src/peer_connection/mod.rs b/webrtc/src/peer_connection/mod.rs index bed6f9f81..487a56e00 100644 --- a/webrtc/src/peer_connection/mod.rs +++ b/webrtc/src/peer_connection/mod.rs @@ -27,7 +27,7 @@ use ::sdp::description::session::*; use ::sdp::util::ConnectionRole; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use interceptor::{stats, Attributes, Interceptor, RTCPWriter}; +use interceptor::{stats, Interceptor, RTCPWriter}; use peer_connection_internal::*; use portable_atomic::{AtomicBool, AtomicU64, AtomicU8}; use rand::{rng, Rng}; @@ -76,7 +76,9 @@ use crate::peer_connection::sdp::*; use crate::peer_connection::signaling_state::{ check_next_signaling_state, RTCSignalingState, StateChangeOp, }; -use crate::rtp_transceiver::rtp_codec::{RTCRtpHeaderExtensionCapability, RTPCodecType}; +use crate::rtp_transceiver::rtp_codec::{ + RTCRtpCodecParameters, RTCRtpHeaderExtensionCapability, RTPCodecType, +}; use crate::rtp_transceiver::rtp_receiver::RTCRtpReceiver; use crate::rtp_transceiver::rtp_sender::RTCRtpSender; use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; @@ -1805,6 +1807,19 @@ impl RTCPeerConnection { self.internal.add_transceiver_from_kind(kind, init).await } + pub async fn add_transceiver_from_params( + &self, + direction: RTCRtpTransceiverDirection, + kind: RTPCodecType, + stream_id: String, + ssrc: u32, + codecs: Vec, + ) -> Result> { + self.internal + .add_transceiver_from_params(direction, kind, stream_id, ssrc, codecs) + .await + } + /// add_transceiver_from_track Create a new RtpTransceiver(SendRecv or SendOnly) and add it to the set of transceivers. pub async fn add_transceiver_from_track( &self, @@ -1917,8 +1932,7 @@ impl RTCPeerConnection { &self, pkts: &[Box], ) -> Result { - let a = Attributes::new(); - Ok(self.interceptor_rtcp_writer.write(pkts, &a).await?) + Ok(self.interceptor_rtcp_writer.write(pkts).await?) } /// close ends the PeerConnection diff --git a/webrtc/src/peer_connection/peer_connection_internal.rs b/webrtc/src/peer_connection/peer_connection_internal.rs index 722a39e4b..3a3860e4e 100644 --- a/webrtc/src/peer_connection/peer_connection_internal.rs +++ b/webrtc/src/peer_connection/peer_connection_internal.rs @@ -3,6 +3,7 @@ use std::sync::Weak; use super::*; use crate::api::setting_engine::SctpMaxMessageSize; +use crate::rtp_transceiver::rtp_codec::RTCRtpCodecParameters; use crate::rtp_transceiver::{create_stream_info, PayloadType}; use crate::stats::stats_collector::StatsCollector; use crate::stats::{ @@ -334,7 +335,14 @@ impl PeerConnectionInternal { return; } Err(err) => { - log::warn!("Failed to accept RTP {err}"); + match err { + // Если агент уже закрыт, то не показываем сообщение об ошибке + srtp::Error::SessionSrtpAlreadyClosed => {} + _ => { + log::warn!("Failed to accept RTP {err}"); + } + } + return; } }; @@ -400,7 +408,14 @@ impl PeerConnectionInternal { ); } Err(err) => { - log::warn!("Failed to accept RTCP {err}"); + match err { + // TODO: убедиться, что нет проблемы с этим кодом + srtp::Error::SessionSrtpAlreadyClosed => {} + _ => { + log::warn!("Failed to accept RTCP {err}"); + } + } + return; } }; @@ -633,6 +648,107 @@ impl PeerConnectionInternal { .await) } + pub(super) async fn add_transceiver_from_params( + &self, + direction: RTCRtpTransceiverDirection, + kind: RTPCodecType, + stream_id: String, + ssrc: u32, + codecs: Vec, + ) -> Result> { + if self.is_closed.load(Ordering::SeqCst) { + return Err(Error::ErrConnectionClosed); + } + + let t = match direction { + RTCRtpTransceiverDirection::Sendonly | RTCRtpTransceiverDirection::Sendrecv => { + let interceptor = self + .interceptor + .upgrade() + .ok_or(Error::ErrInterceptorNotBind)?; + + if direction == RTCRtpTransceiverDirection::Unspecified { + return Err(Error::ErrPeerConnAddTransceiverFromTrackSupport); + } + + let r = Arc::new(RTCRtpReceiver::new( + self.setting_engine.get_receive_mtu(), + kind, + Arc::clone(&self.dtls_transport), + Arc::clone(&self.media_engine), + Arc::clone(&interceptor), + )); + + let s = Arc::new( + RTCRtpSender::new_by_ssrc( + kind, + Arc::clone(&self.dtls_transport), + Arc::clone(&self.media_engine), + Arc::clone(&self.setting_engine), + Arc::clone(&interceptor), + false, + stream_id, + ssrc, + ) + .await, + ); + + RTCRtpTransceiver::new( + r, + s, + direction, + kind, + codecs, + Arc::clone(&self.media_engine), + Some(Box::new(self.make_negotiation_needed_trigger())), + ) + .await + } + RTCRtpTransceiverDirection::Recvonly => { + let interceptor = self + .interceptor + .upgrade() + .ok_or(Error::ErrInterceptorNotBind)?; + let receiver = Arc::new(RTCRtpReceiver::new( + self.setting_engine.get_receive_mtu(), + kind, + Arc::clone(&self.dtls_transport), + Arc::clone(&self.media_engine), + Arc::clone(&interceptor), + )); + + let sender = Arc::new( + RTCRtpSender::new( + None, + kind, + Arc::clone(&self.dtls_transport), + Arc::clone(&self.media_engine), + Arc::clone(&self.setting_engine), + interceptor, + false, + ) + .await, + ); + + RTCRtpTransceiver::new( + receiver, + sender, + direction, + kind, + vec![], + Arc::clone(&self.media_engine), + Some(Box::new(self.make_negotiation_needed_trigger())), + ) + .await + } + _ => return Err(Error::ErrPeerConnAddTransceiverFromKindSupport), + }; + + self.add_rtp_transceiver(Arc::clone(&t)).await; + + Ok(t) + } + /// add_rtp_transceiver appends t into rtp_transceivers /// and fires onNegotiationNeeded; /// caller of this method should hold `self.mu` lock @@ -1097,20 +1213,18 @@ impl PeerConnectionInternal { // Packets that we read as part of simulcast probing that we need to make available // if we do find a track later. - let mut buffered_packets: VecDeque<(rtp::packet::Packet, Attributes)> = VecDeque::default(); + let mut buffered_packets: VecDeque = VecDeque::default(); let mut buf = vec![0u8; self.setting_engine.get_receive_mtu()]; for _ in 0..=SIMULCAST_PROBE_COUNT { - let (pkt, a) = rtp_interceptor - .read(&mut buf, &stream_info.attributes) - .await?; + let pkt = rtp_interceptor.read(&mut buf).await?; let (mid, rid, rsid) = get_stream_mid_rid( &pkt.header, mid_extension_id as u8, sid_extension_id as u8, rsid_extension_id as u8, )?; - buffered_packets.push_back((pkt, a.clone())); + buffered_packets.push_back(pkt); if mid.is_empty() || (rid.is_empty() && rsid.is_empty()) { continue; @@ -1192,7 +1306,7 @@ impl PeerConnectionInternal { tokio::spawn(async move { let mut b = vec![0u8; receive_mtu]; let pkt = match track.peek(&mut b).await { - Ok((pkt, _)) => pkt, + Ok(pkt) => pkt, Err(err) => { log::warn!( "Could not determine PayloadType for SSRC {} ({})", @@ -1531,11 +1645,7 @@ type IResult = std::result::Result; #[async_trait] impl RTCPWriter for PeerConnectionInternal { - async fn write( - &self, - pkts: &[Box], - _a: &Attributes, - ) -> IResult { + async fn write(&self, pkts: &[Box]) -> IResult { Ok(self.dtls_transport.write_rtcp(pkts).await?) } } diff --git a/webrtc/src/peer_connection/peer_connection_test.rs b/webrtc/src/peer_connection/peer_connection_test.rs index cb001fe4b..32d06314c 100644 --- a/webrtc/src/peer_connection/peer_connection_test.rs +++ b/webrtc/src/peer_connection/peer_connection_test.rs @@ -315,7 +315,7 @@ async fn test_get_stats() -> Result<()> { pc_answer.on_track(Box::new(move |track, _, _| { let packet_tx = packet_tx.clone(); tokio::spawn(async move { - while let Ok((pkt, _)) = track.read_rtp().await { + while let Ok(pkt) = track.read_rtp().await { dbg!(&pkt); let last = pkt.payload[pkt.payload.len() - 1]; diff --git a/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs b/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs index 97306a4a0..077f64751 100644 --- a/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use arc_swap::ArcSwapOption; use interceptor::stream_info::{AssociatedStreamInfo, RTPHeaderExtension}; -use interceptor::{Attributes, Interceptor}; +use interceptor::Interceptor; use log::trace; use smol_str::SmolStr; use tokio::sync::{watch, Mutex, RwLock}; @@ -162,10 +162,7 @@ pub struct RTPReceiverInternal { impl RTPReceiverInternal { /// read reads incoming RTCP for this RTPReceiver - async fn read( - &self, - b: &mut [u8], - ) -> Result<(Vec>, Attributes)> { + async fn read(&self, b: &mut [u8]) -> Result>> { let mut state_watch_rx = self.state_tx.subscribe(); // Ensure we are running or paused. When paused we still receive RTCP even if RTP traffic // isn't flowing. @@ -174,13 +171,12 @@ impl RTPReceiverInternal { let tracks = self.tracks.read().await; if let Some(t) = tracks.first() { if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor { - let a = Attributes::new(); loop { tokio::select! { res = State::error_on_close(&mut state_watch_rx) => { res? } - result = rtcp_interceptor.read(b, &a) => { + result = rtcp_interceptor.read(b) => { return Ok(result?) } } @@ -198,7 +194,7 @@ impl RTPReceiverInternal { &self, b: &mut [u8], rid: &str, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { let mut state_watch_rx = self.state_tx.subscribe(); // Ensure we are running or paused. When paused we still receive RTCP even if RTP traffic @@ -209,14 +205,12 @@ impl RTPReceiverInternal { for t in &*tracks { if t.track.rid() == rid { if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor { - let a = Attributes::new(); - loop { tokio::select! { res = State::error_on_close(&mut state_watch_rx) => { res? } - result = rtcp_interceptor.read(b, &a) => { + result = rtcp_interceptor.read(b) => { return Ok(result?); } } @@ -234,11 +228,11 @@ impl RTPReceiverInternal { async fn read_rtcp( &self, receive_mtu: usize, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { let mut b = vec![0u8; receive_mtu]; - let (pkts, attributes) = self.read(&mut b).await?; + let pkts = self.read(&mut b).await?; - Ok((pkts, attributes)) + Ok(pkts) } /// read_simulcast_rtcp is a convenience method that wraps ReadSimulcast and unmarshal for you @@ -246,18 +240,14 @@ impl RTPReceiverInternal { &self, rid: &str, receive_mtu: usize, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { let mut b = vec![0u8; receive_mtu]; - let (pkts, attributes) = self.read_simulcast(&mut b, rid).await?; + let pkts = self.read_simulcast(&mut b, rid).await?; - Ok((pkts, attributes)) + Ok(pkts) } - pub(crate) async fn read_rtp( - &self, - b: &mut [u8], - tid: usize, - ) -> Result<(rtp::packet::Packet, Attributes)> { + pub(crate) async fn read_rtp(&self, b: &mut [u8], tid: usize) -> Result { let mut state_watch_rx = self.state_tx.subscribe(); // Ensure we are running. @@ -283,7 +273,6 @@ impl RTPReceiverInternal { );*/ if let Some(rtp_interceptor) = rtp_interceptor { - let a = Attributes::new(); //println!( // "read_rtp rtp_interceptor.read enter with tid {} ssrc {}", // tid, ssrc @@ -299,11 +288,11 @@ impl RTPReceiverInternal { } current_state = new_state; } - result = rtp_interceptor.read(b, &a) => { + result = rtp_interceptor.read(b) => { let result = result?; if current_state == State::Paused { - trace!("Dropping {} read bytes received while RTPReceiver was paused", result.0); + trace!("Dropping {} read bytes received while RTPReceiver was paused", result); continue; } return Ok(result); @@ -630,7 +619,7 @@ impl RTCRtpReceiver { pub async fn read( &self, b: &mut [u8], - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { self.internal.read(b).await } @@ -639,15 +628,13 @@ impl RTCRtpReceiver { &self, b: &mut [u8], rid: &str, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { self.internal.read_simulcast(b, rid).await } /// read_rtcp is a convenience method that wraps Read and unmarshal for you. /// It also runs any configured interceptors. - pub async fn read_rtcp( - &self, - ) -> Result<(Vec>, Attributes)> { + pub async fn read_rtcp(&self) -> Result>> { self.internal.read_rtcp(self.receive_mtu).await } @@ -655,7 +642,7 @@ impl RTCRtpReceiver { pub async fn read_simulcast_rtcp( &self, rid: &str, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { self.internal .read_simulcast_rtcp(rid, self.receive_mtu) .await @@ -755,11 +742,7 @@ impl RTCRtpReceiver { } /// read_rtp should only be called by a track, this only exists so we can keep state in one place - pub(crate) async fn read_rtp( - &self, - b: &mut [u8], - tid: usize, - ) -> Result<(rtp::packet::Packet, Attributes)> { + pub(crate) async fn read_rtp(&self, b: &mut [u8], tid: usize) -> Result { self.internal.read_rtp(b, tid).await } @@ -807,12 +790,11 @@ impl RTCRtpReceiver { let receive_mtu = self.receive_mtu; let track = t.clone(); tokio::spawn(async move { - let a = Attributes::new(); let mut b = vec![0u8; receive_mtu]; while let Some(repair_rtp_interceptor) = &track.repair_stream.rtp_interceptor { //TODO: cancel repair_rtp_interceptor.read gracefully //println!("repair_rtp_interceptor read begin with ssrc={}", ssrc); - if repair_rtp_interceptor.read(&mut b, &a).await.is_err() { + if repair_rtp_interceptor.read(&mut b).await.is_err() { break; } } diff --git a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs index d1e35d18f..bacc966e9 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Weak}; use ice::rand::generate_crypto_random_string; use interceptor::stream_info::{AssociatedStreamInfo, StreamInfo}; -use interceptor::{Attributes, Interceptor, RTCPReader, RTPWriter}; +use interceptor::{Interceptor, RTCPReader, RTPWriter}; use portable_atomic::AtomicBool; use tokio::select; use tokio::sync::{watch, Mutex, Notify}; @@ -25,6 +25,7 @@ use crate::rtp_transceiver::{ create_stream_info, PayloadType, RTCRtpEncodingParameters, RTCRtpSendParameters, RTCRtpTransceiver, SSRC, }; +use crate::track::track_local::track_local_simple::TrackLocalSimple; use crate::track::track_local::{InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext}; pub(crate) struct RTPSenderInternal { @@ -180,6 +181,77 @@ impl RTCRtpSender { ret } + pub async fn new_by_ssrc( + kind: RTPCodecType, + transport: Arc, + media_engine: Arc, + setting_engine: Arc, + interceptor: Arc, + start_paused: bool, + stream_id: String, + ssrc: u32, + ) -> Self { + let id = generate_crypto_random_string( + 32, + b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", + ); + let (send_called, _) = watch::channel(false); + let stop_called_tx = Arc::new(Notify::new()); + let stop_called_rx = stop_called_tx.clone(); + let stop_called_signal = Arc::new(AtomicBool::new(false)); + + let internal = Arc::new(RTPSenderInternal { + stop_called_rx, + stop_called_signal: Arc::clone(&stop_called_signal), + }); + + let seq_trans = Arc::new(SequenceTransformer::new()); + let rtx_seq_trans = Arc::new(SequenceTransformer::new()); + + let stream_ids = vec![stream_id.clone()]; + let ret = Self { + track_encodings: Mutex::new(vec![]), + + seq_trans, + rtx_seq_trans, + + transport, + + kind, + payload_type: 0, + receive_mtu: setting_engine.get_receive_mtu(), + enable_rtx: setting_engine.enable_sender_rtx, + + negotiated: AtomicBool::new(false), + + media_engine, + interceptor, + + id, + initial_track_id: std::sync::Mutex::new(None), + associated_media_stream_ids: std::sync::Mutex::new(stream_ids), + + rtp_transceiver: SyncMutex::new(None), + + send_called, + stop_called_tx, + stop_called_signal, + + paused: Arc::new(AtomicBool::new(start_paused)), + + internal, + }; + + { + let mut track_encodings = ret.track_encodings.lock().await; + let _ = ret + .add_encoding_internal_by_kind(&mut track_encodings, kind, ssrc, stream_id) + .await; + } + + ret + } + /// AddEncoding adds an encoding to RTPSender. Used by simulcast senders. pub async fn add_encoding(&self, track: Arc) -> Result<()> { let mut track_encodings = self.track_encodings.lock().await; @@ -297,6 +369,52 @@ impl RTCRtpSender { Ok(()) } + async fn add_encoding_internal_by_kind( + &self, + track_encodings: &mut Vec, + kind: RTPCodecType, + ssrc: u32, + stream_id: String, + ) -> Result<()> { + let srtp_stream = Arc::new(SrtpWriterFuture { + closed: AtomicBool::new(false), + ssrc, + rtp_sender: Arc::downgrade(&self.internal), + rtp_transport: Arc::clone(&self.transport), + rtcp_read_stream: Mutex::new(None), + rtp_write_session: Mutex::new(None), + seq_trans: Arc::clone(&self.seq_trans), + }); + + let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc; + let rtcp_interceptor = self.interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; + + let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); + let context = TrackLocalContext { + id: self.id.clone(), + params: super::RTCRtpParameters::default(), + ssrc: 0, + write_stream, + paused: self.paused.clone(), + mid: None, + }; + + let track = TrackLocalSimple::new(kind, format!("{stream_id}_{kind}"), stream_id, ssrc); + let encoding = TrackEncoding { + track: Arc::new(track), + srtp_stream, + rtcp_interceptor, + stream_info: StreamInfo::default(), + context, + ssrc, + rtx: None, + }; + + track_encodings.push(encoding); + + Ok(()) + } + pub(crate) fn is_negotiated(&self) -> bool { self.negotiated.load(Ordering::SeqCst) } @@ -539,11 +657,10 @@ impl RTCRtpSender { let stop_called_rx = self.internal.stop_called_rx.clone(); tokio::spawn(async move { - let attrs = Attributes::new(); let mut b = vec![0u8; receive_mtu]; while !stop_called_signal.load(Ordering::SeqCst) { select! { - r = rtcp_reader.read(&mut b, &attrs) => { + r = rtcp_reader.read(&mut b) => { if r.is_err() { break } @@ -591,17 +708,16 @@ impl RTCRtpSender { pub async fn read( &self, b: &mut [u8], - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { tokio::select! { _ = self.wait_for_send() => { let rtcp_interceptor = { let track_encodings = self.track_encodings.lock().await; track_encodings.first().map(|e|e.rtcp_interceptor.clone()) }.ok_or(Error::ErrInterceptorNotBind)?; - let a = Attributes::new(); tokio::select! { _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), - result = rtcp_interceptor.read(b, &a) => Ok(result?), + result = rtcp_interceptor.read(b) => Ok(result?), } } _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), @@ -609,13 +725,11 @@ impl RTCRtpSender { } /// read_rtcp is a convenience method that wraps Read and unmarshals for you. - pub async fn read_rtcp( - &self, - ) -> Result<(Vec>, Attributes)> { + pub async fn read_rtcp(&self) -> Result>> { let mut b = vec![0u8; self.receive_mtu]; - let (pkts, attributes) = self.read(&mut b).await?; + let pkts = self.read(&mut b).await?; - Ok((pkts, attributes)) + Ok(pkts) } /// ReadSimulcast reads incoming RTCP for this RTPSender for given rid @@ -623,17 +737,16 @@ impl RTCRtpSender { &self, b: &mut [u8], rid: &str, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { tokio::select! { _ = self.wait_for_send() => { let rtcp_interceptor = { let track_encodings = self.track_encodings.lock().await; track_encodings.iter().find(|e| e.track.rid() == Some(rid)).map(|e| e.rtcp_interceptor.clone()) }.ok_or(Error::ErrRTPSenderNoTrackForRID)?; - let a = Attributes::new(); tokio::select! { _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), - result = rtcp_interceptor.read(b, &a) => Ok(result?), + result = rtcp_interceptor.read(b) => Ok(result?), } } _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), @@ -644,11 +757,11 @@ impl RTCRtpSender { pub async fn read_rtcp_simulcast( &self, rid: &str, - ) -> Result<(Vec>, Attributes)> { + ) -> Result>> { let mut b = vec![0u8; self.receive_mtu]; - let (pkts, attributes) = self.read_simulcast(&mut b, rid).await?; + let pkts = self.read_simulcast(&mut b, rid).await?; - Ok((pkts, attributes)) + Ok(pkts) } /// Enables overriding outgoing `RTP` packets' `sequence number`s. diff --git a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs index 9feda0428..d865d7c53 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs @@ -72,7 +72,7 @@ async fn test_rtp_sender_replace_track() -> Result<()> { let seen_packet_b_tx2 = Arc::clone(&seen_packet_b_tx); Box::pin(async move { let pkt = match track.read_rtp().await { - Ok((pkt, _)) => pkt, + Ok(pkt) => pkt, Err(err) => { //assert!(errors.Is(io.EOF, err)) log::debug!("{err}"); diff --git a/webrtc/src/rtp_transceiver/srtp_writer_future.rs b/webrtc/src/rtp_transceiver/srtp_writer_future.rs index ff4361c78..e5fbe30c9 100644 --- a/webrtc/src/rtp_transceiver/srtp_writer_future.rs +++ b/webrtc/src/rtp_transceiver/srtp_writer_future.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Weak}; use async_trait::async_trait; use bytes::Bytes; -use interceptor::{Attributes, RTCPReader, RTPWriter}; +use interceptor::{RTCPReader, RTPWriter}; use portable_atomic::AtomicBool; use srtp::session::Session; use srtp::stream::Stream; @@ -263,18 +263,17 @@ impl RTCPReader for SrtpWriterFuture { async fn read( &self, buf: &mut [u8], - a: &Attributes, - ) -> IResult<(Vec>, Attributes)> { + ) -> IResult>> { let read = self.read(buf).await?; let pkt = rtcp::packet::unmarshal(&mut &buf[..read])?; - Ok((pkt, a.clone())) + Ok(pkt) } } #[async_trait] impl RTPWriter for SrtpWriterFuture { - async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> IResult { + async fn write(&self, pkt: &rtp::packet::Packet) -> IResult { Ok( match self.seq_trans.seq_number(pkt.header.sequence_number) { Some(seq_num) => { diff --git a/webrtc/src/track/track_local/mod.rs b/webrtc/src/track/track_local/mod.rs index 9963e491a..8b1c1ae4d 100644 --- a/webrtc/src/track/track_local/mod.rs +++ b/webrtc/src/track/track_local/mod.rs @@ -1,6 +1,8 @@ #[cfg(test)] mod track_local_static_test; +pub mod packet_cache; +pub mod track_local_simple; pub mod track_local_static_rtp; pub mod track_local_static_sample; @@ -10,7 +12,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use async_trait::async_trait; -use interceptor::{Attributes, RTPWriter}; +use interceptor::RTPWriter; use portable_atomic::AtomicBool; use smol_str::SmolStr; use tokio::sync::Mutex; @@ -23,25 +25,13 @@ use crate::rtp_transceiver::*; /// TrackLocalWriter is the Writer for outbound RTP Packets #[async_trait] pub trait TrackLocalWriter: fmt::Debug { - /// write_rtp_with_attributes encrypts a RTP packet and writes to the connection. - /// attributes are delivered to the interceptor chain - async fn write_rtp_with_attributes( - &self, - pkt: &rtp::packet::Packet, - attr: &Attributes, - ) -> Result; - /// write_rtp encrypts a RTP packet and writes to the connection - async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result { - let attr = Attributes::new(); - self.write_rtp_with_attributes(pkt, &attr).await - } + async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result; /// write encrypts and writes a full RTP packet async fn write(&self, mut b: &[u8]) -> Result { let pkt = rtp::packet::Packet::unmarshal(&mut b)?; - let attr = Attributes::new(); - self.write_rtp_with_attributes(&pkt, &attr).await + self.write_rtp(&pkt).await } } @@ -146,6 +136,10 @@ impl TrackBinding { pub fn is_sender_paused(&self) -> bool { self.sender_paused.load(Ordering::SeqCst) } + + pub fn set_sender_paused(&self, sender_paused: bool) { + self.sender_paused.store(sender_paused, Ordering::SeqCst); + } } pub(crate) struct InterceptorToTrackLocalWriter { @@ -174,18 +168,14 @@ impl std::fmt::Debug for InterceptorToTrackLocalWriter { #[async_trait] impl TrackLocalWriter for InterceptorToTrackLocalWriter { - async fn write_rtp_with_attributes( - &self, - pkt: &rtp::packet::Packet, - attr: &Attributes, - ) -> Result { + async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result { if self.is_sender_paused() { return Ok(0); } let interceptor_rtp_writer = self.interceptor_rtp_writer.lock().await; if let Some(writer) = &*interceptor_rtp_writer { - Ok(writer.write(pkt, attr).await?) + Ok(writer.write(pkt).await?) } else { Ok(0) } diff --git a/webrtc/src/track/track_local/packet_cache.rs b/webrtc/src/track/track_local/packet_cache.rs new file mode 100644 index 000000000..0c4d219a2 --- /dev/null +++ b/webrtc/src/track/track_local/packet_cache.rs @@ -0,0 +1,86 @@ +use rtcp::transport_feedbacks::transport_layer_nack::NackPair; +use rtp::packet::Packet; +use std::time::{Duration, Instant}; +use util::sync::RwLock; + +#[derive(Clone, Debug)] +pub struct PCache { + // Финальный пакет, уже с переписанными SSRC/SN/TS и всеми нужными расширениями + // Можно добавить счётчик/время ретрансляций для отладочной метрики (AtomicU32/Option) + pub packet: Packet, // полный RTP (ingress) + pub first_sent_at: Instant, // для TTL +} + +#[derive(Debug)] +pub struct PCacheBuffer { + ttl: Duration, + capacity: usize, // степень двойки предпочтительно; capacity - 1, если capacity — power-of-two + slots: RwLock>>, +} + +impl PCacheBuffer { + pub fn new(ttl: Duration, capacity_pow2: usize) -> Self { + assert!(capacity_pow2.is_power_of_two()); + Self { + ttl, + slots: RwLock::new(vec![None; capacity_pow2]), + capacity: capacity_pow2 - 1, + } + } + + #[inline] + fn idx(&self, seq: u16) -> usize { + (seq as usize) & self.capacity + } + + pub fn put(&self, packet: Packet) { + let idx = self.idx(packet.header.sequence_number); + let mut slots = self.slots.write(); + slots[idx] = Some(PCache { + packet, + first_sent_at: Instant::now(), + }); + } + + pub fn get(&self, seq: u16) -> Option { + let idx = self.idx(seq); + let slots = self.slots.read(); + let some = slots.get(idx)?.as_ref()?; + if some.packet.header.sequence_number != seq { + println!( + "Коллизия кольца: запрошен seq={seq}. В кеше seq={}", + some.packet.header.sequence_number + ); + return None; // коллизия кольца (wrap) + } + let elapsed = some.first_sent_at.elapsed(); + if elapsed > self.ttl { + println!( + "Пакет просрочен. Прошло {:?}, что больше ttl = {:?}", + elapsed, self.ttl + ); + return None; // просрочен + } + Some(some.packet.clone()) + } +} + +// Вспомогательная функция разворачивания NACK-пар (packet_id + bitmask -> список seq) +// Разворачиваем список потерянных SN из NACK-пар +pub fn expand_nack_pairs(pairs: &[NackPair]) -> Vec { + let mut out = Vec::with_capacity(pairs.len() * 8); + for p in pairs { + let base = p.packet_id; + out.push(base); + let mut mask = p.lost_packets; + let mut i = 0; + while mask != 0 { + if (mask & 1) != 0 { + out.push(base.wrapping_add(i + 1)); + } + mask >>= 1; + i += 1; + } + } + out +} diff --git a/webrtc/src/track/track_local/track_local_simple.rs b/webrtc/src/track/track_local/track_local_simple.rs new file mode 100644 index 000000000..d1f494253 --- /dev/null +++ b/webrtc/src/track/track_local/track_local_simple.rs @@ -0,0 +1,67 @@ +use super::*; +use std::any::Any; + +/// TrackLocalSimple is simples mock of TrackLocal +#[derive(Debug)] +pub struct TrackLocalSimple { + kind: RTPCodecType, + id: String, + stream_id: String, + ssrc: u32, +} + +impl TrackLocalSimple { + /// returns a TrackLocalStaticRTP without rid. + pub fn new(kind: RTPCodecType, id: String, stream_id: String, ssrc: u32) -> Self { + TrackLocalSimple { + kind, + id, + stream_id, + ssrc, + } + } +} + +#[async_trait] +impl TrackLocal for TrackLocalSimple { + async fn bind(&self, _t: &TrackLocalContext) -> Result { + println!( + "TrackLocalSimple.bind: mid - {:?}; {:?}", + _t.mid(), + _t.ssrc() + ); + Ok(RTCRtpCodecParameters { + ..Default::default() + }) + } + + async fn unbind(&self, _t: &TrackLocalContext) -> Result<()> { + println!( + "TrackLocalSimple.unbind: mid-{:?}; {:?}", + _t.mid(), + _t.ssrc() + ); + Ok(()) + } + + fn id(&self) -> &str { + self.id.as_str() + } + + fn rid(&self) -> Option<&str> { + None + } + + fn stream_id(&self) -> &str { + self.stream_id.as_str() + } + + /// kind controls if this TrackLocal is audio or video + fn kind(&self) -> RTPCodecType { + self.kind.clone() + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/webrtc/src/track/track_local/track_local_static_rtp.rs b/webrtc/src/track/track_local/track_local_static_rtp.rs index 44e6de8de..d0a0f3bf1 100644 --- a/webrtc/src/track/track_local/track_local_static_rtp.rs +++ b/webrtc/src/track/track_local/track_local_static_rtp.rs @@ -1,11 +1,87 @@ -use std::collections::HashMap; - use bytes::{Bytes, BytesMut}; -use tokio::sync::Mutex; +use rtp::packet::Packet; +use std::any::Any; +use std::sync::atomic::AtomicU64; +use std::{borrow::Cow, collections::HashMap, time::Duration}; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{mpsc, Mutex}; use util::{Marshal, MarshalSize}; use super::*; -use crate::error::flatten_errs; +use crate::track::track_remote::TrackRemote; +use crate::{error::flatten_errs, track::track_local::packet_cache::PCacheBuffer}; + +#[derive(Debug)] +pub struct TrackState { + last_out_seq: u16, // переживает все переключения источников + last_out_ts: u32, // переживает все переключения источников + started_at_ts: i64, + + out_offset: Option<( + u16, /* смещение порядкового номера */ + u32, /* смещение временной метки timestamp */ + )>, +} + +impl TrackState { + pub fn new() -> Self { + TrackState { + // Порядковый номер начинается с 0 + last_out_seq: 0, + // время трека начинается с 0 + last_out_ts: 0, + // Сохраняем начало трека в реальной временной шкале дла последующей синхронизации + started_at_ts: chrono::Utc::now().timestamp(), + out_offset: None, + } + } + + pub fn apply_offset( + &mut self, + kind: RTPCodecType, + pkt_sequence_number: u16, + pkt_timestamp: u32, + ) -> (u16, u32) { + match self.out_offset { + Some((seq_num_offset, ts_offset)) => { + self.last_out_seq = pkt_sequence_number.wrapping_add(seq_num_offset); + self.last_out_ts = pkt_timestamp.wrapping_add(ts_offset); + (self.last_out_seq, self.last_out_ts) + } + None => { + let seq_num_offset = self + .last_out_seq + .wrapping_sub(pkt_sequence_number) + .wrapping_add(1); + let ts_offset = + self.last_out_ts + .wrapping_sub(pkt_timestamp) + .wrapping_add(match kind { + RTPCodecType::Audio => 900, // стандартное значение для звука + RTPCodecType::Video => 3750, // 90000 clock_rate / 24 кадра + _ => 3750, + }); + self.out_offset = Some((seq_num_offset, ts_offset)); + + self.last_out_seq = pkt_sequence_number.wrapping_add(seq_num_offset); + self.last_out_ts = pkt_timestamp.wrapping_add(ts_offset); + + println!( + "Смещения перезаписаны seq_num: {pkt_sequence_number} -> {}; ts: {pkt_timestamp} -> {}", + self.last_out_seq, self.last_out_ts + ); + (self.last_out_seq, self.last_out_ts) + } + } + } + + pub fn origin_seq(&self, modified_seq: u16) -> u16 { + match self.out_offset { + Some((seq_num_offset, _)) => modified_seq.wrapping_sub(seq_num_offset), + None => modified_seq, + } + } +} /// TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets. /// If you wish to send a media.Sample use TrackLocalStaticSample @@ -16,8 +92,20 @@ pub struct TrackLocalStaticRTP { id: String, rid: Option, stream_id: String, + + pub state: Mutex, + pub rtp_cache: Arc, + + pli_last_ms: AtomicU64, + pli_interval_ms: u64, } +/// Количество пакетов в кэше +const CAPACITY: usize = 256; // если 24 пакета в секунду, то на 3 секунды нужно 72 ячейки кэша + +/// TTL в миллисекундах, время через которое кэш становится невалидным +const TTL_MILLIS: u64 = 3000; + impl TrackLocalStaticRTP { /// returns a TrackLocalStaticRTP without rid. pub fn new(codec: RTCRtpCodecCapability, id: String, stream_id: String) -> Self { @@ -27,6 +115,15 @@ impl TrackLocalStaticRTP { id, rid: None, stream_id, + + state: Mutex::new(TrackState::new()), + rtp_cache: Arc::new(PCacheBuffer::new( + Duration::from_millis(TTL_MILLIS), + CAPACITY, + )), + + pli_last_ms: AtomicU64::new(0), + pli_interval_ms: 500, } } @@ -43,6 +140,15 @@ impl TrackLocalStaticRTP { id, rid: Some(rid), stream_id, + + state: Mutex::new(TrackState::new()), + rtp_cache: Arc::new(PCacheBuffer::new( + Duration::from_millis(TTL_MILLIS), + CAPACITY, + )), + + pli_last_ms: AtomicU64::new(0), + pli_interval_ms: 500, } } @@ -65,37 +171,147 @@ impl TrackLocalStaticRTP { .all(|b| b.sender_paused.load(Ordering::SeqCst)) } - /// write_rtp_with_extensions writes a RTP Packet to the TrackLocalStaticRTP - /// If one PeerConnection fails the packets will still be sent to - /// all PeerConnections. The error message will contain the ID of the failed - /// PeerConnections so you can remove them - /// - /// If the RTCRtpSender direction is such that no packets should be sent, any call to this - /// function are blocked internally. Care must be taken to not increase the sequence number - /// while the sender is paused. While the actual _sending_ is blocked, the receiver will - /// miss out when the sequence number "rolls over", which in turn will break SRTP. - /// - /// Extensions that are already configured on the packet are overwritten by extensions in - /// `extensions`. - pub async fn write_rtp_with_extensions( + pub async fn is_binding_active(&self, binding_ssrc: u32) -> bool { + match { + let bindings = self.bindings.lock().await; + bindings + .iter() + .find(|b| b.ssrc == binding_ssrc) + .map(|b| b.clone()) + } { + Some(b) => !b.is_sender_paused(), + None => false, + } + } + + /// Выполняется, когда мы изменяем источник данных для трека + pub async fn replace_remote(self: Arc, remote_track: Arc) { + // 1. Приводим исходящее смещение к начальному состоянию, + // чтоб определить его заново в момент первого пришедшего пакета + { + let mut s = self.state.lock().await; + s.out_offset = None; + } + + // 2. Запись из mpsc канала в local_track + // здесь должен быть минимальный буфер, + // т.к. лучше потом отправить из кеша, чем пытаться отправить застрявший пакет из очереди + let (rtp_sender, mut rtp_rx) = mpsc::channel::(64); + let local_track = Arc::downgrade(&self); + let rtp_writer = tokio::spawn(async move { + while let Some(pkt) = rtp_rx.recv().await { + if let Some(local_track) = local_track.upgrade() { + if let Err(err) = local_track.write_rtp(&pkt).await { + eprintln!("Ошибка записи данных в исходящий трек: {:?}", err); + } + } else { + break; + } + } + println!("Запись данных в трек остановлена!"); + }); + + // 3. Чтение из remote_track в mpsc канал + while let Ok(rtp) = remote_track.read_rtp().await { + // 1. Сохраняем в кэш оригинальный rtp без смещений! Так быстрее происходит сохранение в кэш + // При восстановлении кеша нужно вернуть порядковый номер к оригинальному, чтоб найти его + self.rtp_cache.put(rtp.clone()); + + // 2. Пытаемся отправить, если переполнен буфер, не ждём и позже в ответ на NACK берём из кэша + // Без ожиданий, чтоб не замедлять процесс получения пакетов + match rtp_sender.try_send(rtp) { + Err(TrySendError::Closed(_)) => { + break; + } + Err(TrySendError::Full(_)) => { + eprintln!("Ошибка отправки RTP данных: Буфер переполнен"); + } + _ => {} + } + } + + // 4. Если remote_track перестал слать пакеты, то перестаём и записывать их + rtp_writer.abort(); + } + + /// Получаем ssrc всех RTCPeerConnection подключений к этому треку + pub async fn bindings_ssrc(&self) -> Vec { + let bindings = self.bindings.lock().await; + bindings.iter().map(|b| b.ssrc).collect() + } + + pub async fn bindings_ids(&self) -> Vec { + let bindings = self.bindings.lock().await; + bindings.iter().map(|b| b.id.clone()).collect() + } + + pub async fn write_rtp_with_extensions_to( &self, p: &rtp::packet::Packet, extensions: &[rtp::extension::HeaderExtension], + binding_ssrc: u32, ) -> Result { - let attr = Attributes::new(); - self.write_rtp_with_extensions_attributes(p, extensions, &attr) - .await + let binding = { + let bindings = self.bindings.lock().await; + bindings + .iter() + .find(|b| b.ssrc == binding_ssrc) + .map(|b| b.clone()) + }; + + if let Some(b) = binding { + // Prepare the extensions data + let mut extension_error = None; + let extension_data: HashMap<_, _> = extensions + .iter() + .flat_map(|extension| { + let buf = { + let mut buf = BytesMut::with_capacity(extension.marshal_size()); + buf.resize(extension.marshal_size(), 0); + if let Err(err) = extension.marshal_to(&mut buf) { + extension_error = Some(Error::Util(err)); + return None; + } + + buf.freeze() + }; + + Some((extension.uri(), buf)) + }) + .collect(); + if let Some(err) = extension_error { + return Err(err); + } + + self.write_rtp_with_extensions_to_binding(p, &extension_data, b) + .await + } else { + // Must return Ok(usize) to be consistent with write_rtp_with_extensions_attributes + Err(Error::LocalTrackBindingNotFound) + } } - pub async fn write_rtp_with_extensions_attributes( + pub async fn write_rtp_with_extensions( &self, - p: &rtp::packet::Packet, + pkt: &rtp::packet::Packet, extensions: &[rtp::extension::HeaderExtension], - attr: &Attributes, ) -> Result { + let mut pkt = pkt.clone(); + + let (seq_number, timestamp) = { + let mut st = self.state.lock().await; + st.apply_offset( + self.kind(), + pkt.header.sequence_number, + pkt.header.timestamp, + ) + }; + + pkt.header.sequence_number = seq_number; + pkt.header.timestamp = timestamp; + let mut n = 0; let mut write_errs = vec![]; - let mut pkt = p.clone(); let bindings = { let bindings = self.bindings.lock().await; @@ -121,47 +337,121 @@ impl TrackLocalStaticRTP { .collect(); for b in bindings.into_iter() { - if b.is_sender_paused() { - // See caveat in function doc. - continue; + match self + .write_rtp_with_extensions_to_binding(&pkt, &extension_data, b) + .await + { + Ok(one_or_zero) => { + n += one_or_zero; + } + Err(err) => { + write_errs.push(err); + } } - pkt.header.ssrc = b.ssrc; - pkt.header.payload_type = b.payload_type; + } - for ext in b.hdr_ext_ids.iter() { - let payload = ext.payload.to_owned(); - if let Err(err) = pkt.header.set_extension(ext.id, payload) { - write_errs.push(Error::Rtp(err)); - } + flatten_errs(write_errs)?; + Ok(n) + } + + pub async fn write_rtp_to( + &self, + pkt: &rtp::packet::Packet, + binding_ssrc: u32, + ) -> Result { + let mut pkt = pkt.clone(); + + let (seq_number, timestamp) = { + let mut st = self.state.lock().await; + st.apply_offset( + self.kind(), + pkt.header.sequence_number, + pkt.header.timestamp, + ) + }; + + pkt.header.sequence_number = seq_number; + pkt.header.timestamp = timestamp; + self.write_rtp_with_extensions_to(&pkt, &[], binding_ssrc) + .await + } + + pub async fn set_muted(&self, muted: bool) { + let bindings = { + let bindings = self.bindings.lock().await; + bindings.clone() + }; + bindings.iter().for_each(|b| { + b.set_sender_paused(muted); + }); + } + + pub async fn set_muted_for(&self, bindings_ssrc: Vec<(u32, bool)>) { + let bindings = { + let bindings = self.bindings.lock().await; + bindings.clone() + }; + bindings.iter().for_each(|b| { + if let Some((_, muted)) = bindings_ssrc.iter().find(|(ssrc, _)| *ssrc == b.ssrc) { + b.set_sender_paused(*muted); } + }); + } - for (uri, data) in extension_data.iter() { - if let Some(id) = b - .params - .header_extensions - .iter() - .find(|ext| &ext.uri == uri) - .map(|ext| ext.id) - { - if let Err(err) = pkt.header.set_extension(id as u8, data.clone()) { - write_errs.push(Error::Rtp(err)); - continue; - } - } + pub fn should_fire_pli(&self, now_ms: u64) -> bool { + loop { + let prev = self.pli_last_ms.load(Ordering::Relaxed); + if now_ms.saturating_sub(prev) < self.pli_interval_ms { + return false; + } + if self + .pli_last_ms + .compare_exchange(prev, now_ms, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + return true; } + // кто-то другой успел обновить last_ms — пробуем снова + } + } - match b.write_stream.write_rtp_with_attributes(&pkt, attr).await { - Ok(m) => { - n += m; - } - Err(err) => { - write_errs.push(err); + async fn write_rtp_with_extensions_to_binding( + &self, + p: &rtp::packet::Packet, + extension_data: &HashMap, Bytes>, + binidng: Arc, + ) -> Result { + let mut pkt = p.clone(); + + if binidng.is_sender_paused() { + // See caveat in function doc. + return Ok(0); + } + pkt.header.ssrc = binidng.ssrc; + pkt.header.payload_type = binidng.payload_type; + + for ext in binidng.hdr_ext_ids.iter() { + let payload = ext.payload.to_owned(); + if let Err(err) = pkt.header.set_extension(ext.id, payload) { + return Err(Error::Rtp(err)); + } + } + + for (uri, data) in extension_data.iter() { + if let Some(id) = binidng + .params + .header_extensions + .iter() + .find(|ext| &ext.uri == uri) + .map(|ext| ext.id) + { + if let Err(err) = pkt.header.set_extension(id as u8, data.clone()) { + return Err(Error::Rtp(err)); } } } - flatten_errs(write_errs)?; - Ok(n) + binidng.write_stream.write_rtp(&pkt).await } } @@ -171,6 +461,11 @@ impl TrackLocal for TrackLocalStaticRTP { /// This asserts that the code requested is supported by the remote peer. /// If so it setups all the state (SSRC and PayloadType) to have a call async fn bind(&self, t: &TrackLocalContext) -> Result { + println!( + "TrackLocalStaticRTP.bind: mid={:?}; ssrc={:?}", + t.mid(), + t.ssrc() + ); let parameters = RTCRtpCodecParameters { capability: self.codec.clone(), ..Default::default() @@ -226,6 +521,11 @@ impl TrackLocal for TrackLocalStaticRTP { /// unbind implements the teardown logic when the track is no longer needed. This happens /// because a track has been stopped. async fn unbind(&self, t: &TrackLocalContext) -> Result<()> { + println!( + "TrackLocalStaticRTP.unbind: mid={:?}; ssrc={:?}", + t.mid(), + t.ssrc() + ); let mut bindings = self.bindings.lock().await; let mut idx = None; for (index, binding) in bindings.iter().enumerate() { @@ -277,7 +577,7 @@ impl TrackLocal for TrackLocalStaticRTP { #[async_trait] impl TrackLocalWriter for TrackLocalStaticRTP { - /// `write_rtp_with_attributes` writes a RTP Packet to the TrackLocalStaticRTP + /// `write_rtp` writes a RTP Packet to the TrackLocalStaticRTP /// If one PeerConnection fails the packets will still be sent to /// all PeerConnections. The error message will contain the ID of the failed /// PeerConnections so you can remove them @@ -286,13 +586,8 @@ impl TrackLocalWriter for TrackLocalStaticRTP { /// function are blocked internally. Care must be taken to not increase the sequence number /// while the sender is paused. While the actual _sending_ is blocked, the receiver will /// miss out when the sequence number "rolls over", which in turn will break SRTP. - async fn write_rtp_with_attributes( - &self, - pkt: &rtp::packet::Packet, - attr: &Attributes, - ) -> Result { - self.write_rtp_with_extensions_attributes(pkt, &[], attr) - .await + async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result { + self.write_rtp_with_extensions(pkt, &[]).await } /// write writes a RTP Packet as a buffer to the TrackLocalStaticRTP diff --git a/webrtc/src/track/track_remote/mod.rs b/webrtc/src/track/track_remote/mod.rs index 7d0565cf9..64c6ccb47 100644 --- a/webrtc/src/track/track_remote/mod.rs +++ b/webrtc/src/track/track_remote/mod.rs @@ -5,7 +5,7 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use arc_swap::ArcSwapOption; -use interceptor::{Attributes, Interceptor}; +use interceptor::Interceptor; use portable_atomic::{AtomicU32, AtomicU8, AtomicUsize}; use smol_str::SmolStr; use tokio::sync::Mutex; @@ -32,7 +32,7 @@ struct Handlers { #[derive(Default)] struct TrackRemoteInternal { - peeked: VecDeque<(rtp::packet::Packet, Attributes)>, + peeked: VecDeque, } /// TrackRemote represents a single inbound source of media @@ -214,14 +214,14 @@ impl TrackRemote { /// /// **Cancel Safety:** This method is not cancel safe. Dropping the resulting [`Future`] before /// it returns [`std::task::Poll::Ready`] will cause data loss. - pub async fn read(&self, b: &mut [u8]) -> Result<(rtp::packet::Packet, Attributes)> { + pub async fn read(&self, b: &mut [u8]) -> Result { { // Internal lock scope let mut internal = self.internal.lock().await; - if let Some((pkt, attributes)) = internal.peeked.pop_front() { + if let Some(pkt) = internal.peeked.pop_front() { self.check_and_update_track(&pkt).await?; - return Ok((pkt, attributes)); + return Ok(pkt); } }; @@ -230,9 +230,9 @@ impl TrackRemote { None => return Err(Error::ErrRTPReceiverNil), }; - let (pkt, attributes) = receiver.read_rtp(b, self.tid).await?; + let pkt = receiver.read_rtp(b, self.tid).await?; self.check_and_update_track(&pkt).await?; - Ok((pkt, attributes)) + Ok(pkt) } /// check_and_update_track checks payloadType for every incoming packet @@ -269,25 +269,25 @@ impl TrackRemote { } /// read_rtp is a convenience method that wraps Read and unmarshals for you. - pub async fn read_rtp(&self) -> Result<(rtp::packet::Packet, Attributes)> { + pub async fn read_rtp(&self) -> Result { let mut b = vec![0u8; self.receive_mtu]; - let (pkt, attributes) = self.read(&mut b).await?; + let pkt = self.read(&mut b).await?; - Ok((pkt, attributes)) + Ok(pkt) } /// peek is like Read, but it doesn't discard the packet read - pub(crate) async fn peek(&self, b: &mut [u8]) -> Result<(rtp::packet::Packet, Attributes)> { - let (pkt, a) = self.read(b).await?; + pub(crate) async fn peek(&self, b: &mut [u8]) -> Result { + let pkt = self.read(b).await?; // this might overwrite data if somebody peeked between the Read // and us getting the lock. Oh well, we'll just drop a packet in // that case. { let mut internal = self.internal.lock().await; - internal.peeked.push_back((pkt.clone(), a.clone())); + internal.peeked.push_back(pkt.clone()); } - Ok((pkt, a)) + Ok(pkt) } /// Set the initially peeked data for this track. @@ -295,10 +295,7 @@ impl TrackRemote { /// This is useful when a track is first created to populate data read from the track in the /// process of identifying the track as part of simulcast probing. Using this during other /// parts of the track's lifecycle is probably an error. - pub(crate) async fn prepopulate_peeked_data( - &self, - data: VecDeque<(rtp::packet::Packet, Attributes)>, - ) { + pub(crate) async fn prepopulate_peeked_data(&self, data: VecDeque) { let mut internal = self.internal.lock().await; internal.peeked = data; }