From c978a19be069221dab20eb88891fa9e44de8796e Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 22 Oct 2025 18:52:52 +0200 Subject: [PATCH 1/4] feat: Make max channel size of transport thread configurable --- sentry-core/src/clientoptions.rs | 7 ++ sentry/src/transports/curl.rs | 166 +++++++++++++------------- sentry/src/transports/reqwest.rs | 73 +++++------ sentry/src/transports/thread.rs | 6 +- sentry/src/transports/tokio_thread.rs | 7 +- sentry/src/transports/ureq.rs | 62 +++++----- 6 files changed, 172 insertions(+), 149 deletions(-) diff --git a/sentry-core/src/clientoptions.rs b/sentry-core/src/clientoptions.rs index 33b5bbd4c..8e5c4138d 100644 --- a/sentry-core/src/clientoptions.rs +++ b/sentry-core/src/clientoptions.rs @@ -196,6 +196,8 @@ pub struct ClientOptions { pub trim_backtraces: bool, /// The user agent that should be reported. pub user_agent: Cow<'static, str>, + /// The maximum number of envelopes that can be enqueued for sending in the transport channel (defaults to 30). + pub max_transport_channel_size: usize, } impl ClientOptions { @@ -287,6 +289,10 @@ impl fmt::Debug for ClientOptions { .field("extra_border_frames", &self.extra_border_frames) .field("trim_backtraces", &self.trim_backtraces) .field("user_agent", &self.user_agent) + .field( + "max_transport_channel_size", + &self.max_transport_channel_size, + ) .finish() } } @@ -328,6 +334,7 @@ impl Default for ClientOptions { enable_logs: true, #[cfg(feature = "logs")] before_send_log: None, + max_transport_channel_size: 30, } } } diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index 7cf5a2b42..d4e4ed0c9 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -38,96 +38,100 @@ impl CurlHttpTransport { let accept_invalid_certs = options.accept_invalid_certs; let mut handle = client; - let thread = TransportThread::new(move |envelope, rl| { - handle.reset(); - handle.url(&url).unwrap(); - handle.custom_request("POST").unwrap(); - - if accept_invalid_certs { - handle.ssl_verify_host(false).unwrap(); - handle.ssl_verify_peer(false).unwrap(); - } - - match (scheme, &http_proxy, &https_proxy) { - (Scheme::Https, _, Some(proxy)) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); - } + let thread = TransportThread::new( + move |envelope, rl| { + handle.reset(); + handle.url(&url).unwrap(); + handle.custom_request("POST").unwrap(); + + if accept_invalid_certs { + handle.ssl_verify_host(false).unwrap(); + handle.ssl_verify_peer(false).unwrap(); } - (_, Some(proxy), _) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); + + match (scheme, &http_proxy, &https_proxy) { + (Scheme::Https, _, Some(proxy)) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); + } + } + (_, Some(proxy), _) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); + } } + _ => {} } - _ => {} - } - - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let mut body = Cursor::new(body); - - let mut retry_after = None; - let mut sentry_header = None; - let mut headers = curl::easy::List::new(); - headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); - headers.append("Expect:").unwrap(); - handle.http_headers(headers).unwrap(); - handle.upload(true).unwrap(); - handle.in_filesize(body.get_ref().len() as u64).unwrap(); - handle - .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) - .unwrap(); - handle.verbose(true).unwrap(); - handle - .debug_function(move |info, data| { - let prefix = match info { - curl::easy::InfoType::HeaderIn => "< ", - curl::easy::InfoType::HeaderOut => "> ", - curl::easy::InfoType::DataOut => "", - _ => return, - }; - sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); - }) - .unwrap(); - - { - let mut handle = handle.transfer(); - let retry_after_setter = &mut retry_after; - let sentry_header_setter = &mut sentry_header; + + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let mut body = Cursor::new(body); + + let mut retry_after = None; + let mut sentry_header = None; + let mut headers = curl::easy::List::new(); + headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); + headers.append("Expect:").unwrap(); + handle.http_headers(headers).unwrap(); + handle.upload(true).unwrap(); + handle.in_filesize(body.get_ref().len() as u64).unwrap(); + handle + .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) + .unwrap(); + handle.verbose(true).unwrap(); handle - .header_function(move |data| { - if let Ok(data) = std::str::from_utf8(data) { - let mut iter = data.split(':'); - if let Some(key) = iter.next().map(str::to_lowercase) { - if key == "retry-after" { - *retry_after_setter = iter.next().map(|x| x.trim().to_string()); - } else if key == "x-sentry-rate-limits" { - *sentry_header_setter = - iter.next().map(|x| x.trim().to_string()); + .debug_function(move |info, data| { + let prefix = match info { + curl::easy::InfoType::HeaderIn => "< ", + curl::easy::InfoType::HeaderOut => "> ", + curl::easy::InfoType::DataOut => "", + _ => return, + }; + sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); + }) + .unwrap(); + + { + let mut handle = handle.transfer(); + let retry_after_setter = &mut retry_after; + let sentry_header_setter = &mut sentry_header; + handle + .header_function(move |data| { + if let Ok(data) = std::str::from_utf8(data) { + let mut iter = data.split(':'); + if let Some(key) = iter.next().map(str::to_lowercase) { + if key == "retry-after" { + *retry_after_setter = + iter.next().map(|x| x.trim().to_string()); + } else if key == "x-sentry-rate-limits" { + *sentry_header_setter = + iter.next().map(|x| x.trim().to_string()); + } } } + true + }) + .unwrap(); + handle.perform().ok(); + } + + match handle.response_code() { + Ok(response_code) => { + if let Some(sentry_header) = sentry_header { + rl.update_from_sentry_header(&sentry_header); + } else if let Some(retry_after) = retry_after { + rl.update_from_retry_after(&retry_after); + } else if response_code == 429 { + rl.update_from_429(); } - true - }) - .unwrap(); - handle.perform().ok(); - } - - match handle.response_code() { - Ok(response_code) => { - if let Some(sentry_header) = sentry_header { - rl.update_from_sentry_header(&sentry_header); - } else if let Some(retry_after) = retry_after { - rl.update_from_retry_after(&retry_after); - } else if response_code == 429 { - rl.update_from_429(); + } + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } - } - }); + }, + options, + ); Self { thread } } } diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index 9e21364a0..c637915aa 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -62,48 +62,51 @@ impl ReqwestHttpTransport { let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let thread = TransportThread::new(move |envelope, mut rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); + let thread = TransportThread::new( + move |envelope, mut rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); - // NOTE: because of lifetime issues, building the request using the - // `client` has to happen outside of this async block. - async move { - match request.send().await { - Ok(response) => { - let headers = response.headers(); + // NOTE: because of lifetime issues, building the request using the + // `client` has to happen outside of this async block. + async move { + match request.send().await { + Ok(response) => { + let headers = response.headers(); - if let Some(sentry_header) = headers - .get("x-sentry-rate-limits") - .and_then(|x| x.to_str().ok()) - { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = headers - .get(ReqwestHeaders::RETRY_AFTER) - .and_then(|x| x.to_str().ok()) - { - rl.update_from_retry_after(retry_after); - } else if response.status() == StatusCode::TOO_MANY_REQUESTS { - rl.update_from_429(); - } - - match response.text().await { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); + if let Some(sentry_header) = headers + .get("x-sentry-rate-limits") + .and_then(|x| x.to_str().ok()) + { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = headers + .get(ReqwestHeaders::RETRY_AFTER) + .and_then(|x| x.to_str().ok()) + { + rl.update_from_retry_after(retry_after); + } else if response.status() == StatusCode::TOO_MANY_REQUESTS { + rl.update_from_429(); } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); + + match response.text().await { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); + } } } + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); + } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } + rl } - rl - } - }); + }, + options, + ); Self { thread } } } diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 6ccca6fae..dd6c8cf81 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -4,6 +4,8 @@ use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::time::Duration; +use sentry_core::ClientOptions; + use super::ratelimit::{RateLimiter, RateLimitingCategory}; use crate::{sentry_debug, Envelope}; @@ -25,11 +27,11 @@ pub struct TransportThread { } impl TransportThread { - pub fn new(mut send: SendFn) -> Self + pub fn new(mut send: SendFn, options: &ClientOptions) -> Self where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { - let (sender, receiver) = sync_channel(30); + let (sender, receiver) = sync_channel(options.max_transport_channel_size); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 5ef734e31..9393d9313 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -4,6 +4,8 @@ use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::time::Duration; +use sentry_core::ClientOptions; + use super::ratelimit::{RateLimiter, RateLimitingCategory}; use crate::{sentry_debug, Envelope}; @@ -25,13 +27,12 @@ pub struct TransportThread { } impl TransportThread { - pub fn new(mut send: SendFn) -> Self + pub fn new(mut send: SendFn, options: &ClientOptions) -> Self where SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, - // NOTE: returning RateLimiter here, otherwise we are in borrow hell SendFuture: std::future::Future, { - let (sender, receiver) = sync_channel(30); + let (sender, receiver) = sync_channel(options.max_transport_channel_size); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index 9fa20fe4b..e109ccc9f 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -83,39 +83,45 @@ impl UreqHttpTransport { let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let thread = TransportThread::new(move |envelope, rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); - - match request { - Ok(mut response) => { - fn header_str<'a, B>(response: &'a Response, key: &str) -> Option<&'a str> { - response.headers().get(key)?.to_str().ok() - } - - if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = header_str(&response, "retry-after") { - rl.update_from_retry_after(retry_after); - } else if response.status() == 429 { - rl.update_from_429(); - } + let thread = TransportThread::new( + move |envelope, rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); + + match request { + Ok(mut response) => { + fn header_str<'a, B>( + response: &'a Response, + key: &str, + ) -> Option<&'a str> { + response.headers().get(key)?.to_str().ok() + } - match response.body_mut().read_to_string() { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); + if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = header_str(&response, "retry-after") { + rl.update_from_retry_after(retry_after); + } else if response.status() == 429 { + rl.update_from_429(); } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); + + match response.body_mut().read_to_string() { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); + } } } + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); + } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } - } - }); + }, + options, + ); Self { thread } } } From f15c102b07f0611c1dd53b0d20aacfd3f1f0b1fd Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 22 Oct 2025 18:56:32 +0200 Subject: [PATCH 2/4] make options the first param --- sentry/src/transports/curl.rs | 2 +- sentry/src/transports/reqwest.rs | 2 +- sentry/src/transports/thread.rs | 2 +- sentry/src/transports/tokio_thread.rs | 2 +- sentry/src/transports/ureq.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index d4e4ed0c9..509db4395 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -39,6 +39,7 @@ impl CurlHttpTransport { let mut handle = client; let thread = TransportThread::new( + options, move |envelope, rl| { handle.reset(); handle.url(&url).unwrap(); @@ -130,7 +131,6 @@ impl CurlHttpTransport { } } }, - options, ); Self { thread } } diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index c637915aa..ace68417f 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -63,6 +63,7 @@ impl ReqwestHttpTransport { let url = dsn.envelope_api_url().to_string(); let thread = TransportThread::new( + options, move |envelope, mut rl| { let mut body = Vec::new(); envelope.to_writer(&mut body).unwrap(); @@ -105,7 +106,6 @@ impl ReqwestHttpTransport { rl } }, - options, ); Self { thread } } diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index dd6c8cf81..efb384a42 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -27,7 +27,7 @@ pub struct TransportThread { } impl TransportThread { - pub fn new(mut send: SendFn, options: &ClientOptions) -> Self + pub fn new(options: &ClientOptions, mut send: SendFn) -> Self where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 9393d9313..1bc0482b2 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -27,7 +27,7 @@ pub struct TransportThread { } impl TransportThread { - pub fn new(mut send: SendFn, options: &ClientOptions) -> Self + pub fn new(options: &ClientOptions, mut send: SendFn) -> Self where SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, SendFuture: std::future::Future, diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index e109ccc9f..019cdf815 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -84,6 +84,7 @@ impl UreqHttpTransport { let url = dsn.envelope_api_url().to_string(); let thread = TransportThread::new( + options, move |envelope, rl| { let mut body = Vec::new(); envelope.to_writer(&mut body).unwrap(); @@ -120,7 +121,6 @@ impl UreqHttpTransport { } } }, - options, ); Self { thread } } From 6e8b7da89cc34e03219b66837049348920df9124 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 22 Oct 2025 19:00:49 +0200 Subject: [PATCH 3/4] rename it --- sentry-core/src/clientoptions.rs | 8 ++++---- sentry/src/transports/thread.rs | 2 +- sentry/src/transports/tokio_thread.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sentry-core/src/clientoptions.rs b/sentry-core/src/clientoptions.rs index 8e5c4138d..3aaaa2379 100644 --- a/sentry-core/src/clientoptions.rs +++ b/sentry-core/src/clientoptions.rs @@ -197,7 +197,7 @@ pub struct ClientOptions { /// The user agent that should be reported. pub user_agent: Cow<'static, str>, /// The maximum number of envelopes that can be enqueued for sending in the transport channel (defaults to 30). - pub max_transport_channel_size: usize, + pub transport_channel_capacity: usize, } impl ClientOptions { @@ -290,8 +290,8 @@ impl fmt::Debug for ClientOptions { .field("trim_backtraces", &self.trim_backtraces) .field("user_agent", &self.user_agent) .field( - "max_transport_channel_size", - &self.max_transport_channel_size, + "transport_channel_capacity", + &self.transport_channel_capacity, ) .finish() } @@ -334,7 +334,7 @@ impl Default for ClientOptions { enable_logs: true, #[cfg(feature = "logs")] before_send_log: None, - max_transport_channel_size: 30, + transport_channel_capacity: 30, } } } diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index efb384a42..9f842bdc7 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -31,7 +31,7 @@ impl TransportThread { where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { - let (sender, receiver) = sync_channel(options.max_transport_channel_size); + let (sender, receiver) = sync_channel(options.transport_channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 1bc0482b2..b2d8ef045 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -32,7 +32,7 @@ impl TransportThread { SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, SendFuture: std::future::Future, { - let (sender, receiver) = sync_channel(options.max_transport_channel_size); + let (sender, receiver) = sync_channel(options.transport_channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() From d9fc76403fd6366511fb880bb25c0be5c5aa157d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 22 Oct 2025 19:05:42 +0200 Subject: [PATCH 4/4] lints --- sentry/src/transports/curl.rs | 166 +++++++++++++++---------------- sentry/src/transports/reqwest.rs | 73 +++++++------- sentry/src/transports/ureq.rs | 62 ++++++------ 3 files changed, 144 insertions(+), 157 deletions(-) diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index 509db4395..7ba3b56e9 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -38,100 +38,96 @@ impl CurlHttpTransport { let accept_invalid_certs = options.accept_invalid_certs; let mut handle = client; - let thread = TransportThread::new( - options, - move |envelope, rl| { - handle.reset(); - handle.url(&url).unwrap(); - handle.custom_request("POST").unwrap(); - - if accept_invalid_certs { - handle.ssl_verify_host(false).unwrap(); - handle.ssl_verify_peer(false).unwrap(); - } - - match (scheme, &http_proxy, &https_proxy) { - (Scheme::Https, _, Some(proxy)) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); - } + let thread = TransportThread::new(options, move |envelope, rl| { + handle.reset(); + handle.url(&url).unwrap(); + handle.custom_request("POST").unwrap(); + + if accept_invalid_certs { + handle.ssl_verify_host(false).unwrap(); + handle.ssl_verify_peer(false).unwrap(); + } + + match (scheme, &http_proxy, &https_proxy) { + (Scheme::Https, _, Some(proxy)) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); } - (_, Some(proxy), _) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); - } + } + (_, Some(proxy), _) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); } - _ => {} } - - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let mut body = Cursor::new(body); - - let mut retry_after = None; - let mut sentry_header = None; - let mut headers = curl::easy::List::new(); - headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); - headers.append("Expect:").unwrap(); - handle.http_headers(headers).unwrap(); - handle.upload(true).unwrap(); - handle.in_filesize(body.get_ref().len() as u64).unwrap(); - handle - .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) - .unwrap(); - handle.verbose(true).unwrap(); + _ => {} + } + + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let mut body = Cursor::new(body); + + let mut retry_after = None; + let mut sentry_header = None; + let mut headers = curl::easy::List::new(); + headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); + headers.append("Expect:").unwrap(); + handle.http_headers(headers).unwrap(); + handle.upload(true).unwrap(); + handle.in_filesize(body.get_ref().len() as u64).unwrap(); + handle + .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) + .unwrap(); + handle.verbose(true).unwrap(); + handle + .debug_function(move |info, data| { + let prefix = match info { + curl::easy::InfoType::HeaderIn => "< ", + curl::easy::InfoType::HeaderOut => "> ", + curl::easy::InfoType::DataOut => "", + _ => return, + }; + sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); + }) + .unwrap(); + + { + let mut handle = handle.transfer(); + let retry_after_setter = &mut retry_after; + let sentry_header_setter = &mut sentry_header; handle - .debug_function(move |info, data| { - let prefix = match info { - curl::easy::InfoType::HeaderIn => "< ", - curl::easy::InfoType::HeaderOut => "> ", - curl::easy::InfoType::DataOut => "", - _ => return, - }; - sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); - }) - .unwrap(); - - { - let mut handle = handle.transfer(); - let retry_after_setter = &mut retry_after; - let sentry_header_setter = &mut sentry_header; - handle - .header_function(move |data| { - if let Ok(data) = std::str::from_utf8(data) { - let mut iter = data.split(':'); - if let Some(key) = iter.next().map(str::to_lowercase) { - if key == "retry-after" { - *retry_after_setter = - iter.next().map(|x| x.trim().to_string()); - } else if key == "x-sentry-rate-limits" { - *sentry_header_setter = - iter.next().map(|x| x.trim().to_string()); - } + .header_function(move |data| { + if let Ok(data) = std::str::from_utf8(data) { + let mut iter = data.split(':'); + if let Some(key) = iter.next().map(str::to_lowercase) { + if key == "retry-after" { + *retry_after_setter = iter.next().map(|x| x.trim().to_string()); + } else if key == "x-sentry-rate-limits" { + *sentry_header_setter = + iter.next().map(|x| x.trim().to_string()); } } - true - }) - .unwrap(); - handle.perform().ok(); - } - - match handle.response_code() { - Ok(response_code) => { - if let Some(sentry_header) = sentry_header { - rl.update_from_sentry_header(&sentry_header); - } else if let Some(retry_after) = retry_after { - rl.update_from_retry_after(&retry_after); - } else if response_code == 429 { - rl.update_from_429(); } + true + }) + .unwrap(); + handle.perform().ok(); + } + + match handle.response_code() { + Ok(response_code) => { + if let Some(sentry_header) = sentry_header { + rl.update_from_sentry_header(&sentry_header); + } else if let Some(retry_after) = retry_after { + rl.update_from_retry_after(&retry_after); + } else if response_code == 429 { + rl.update_from_429(); } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } } - }, - ); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); + } + } + }); Self { thread } } } diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index ace68417f..6db1f749e 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -62,51 +62,48 @@ impl ReqwestHttpTransport { let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let thread = TransportThread::new( - options, - move |envelope, mut rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); + let thread = TransportThread::new(options, move |envelope, mut rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); - // NOTE: because of lifetime issues, building the request using the - // `client` has to happen outside of this async block. - async move { - match request.send().await { - Ok(response) => { - let headers = response.headers(); + // NOTE: because of lifetime issues, building the request using the + // `client` has to happen outside of this async block. + async move { + match request.send().await { + Ok(response) => { + let headers = response.headers(); - if let Some(sentry_header) = headers - .get("x-sentry-rate-limits") - .and_then(|x| x.to_str().ok()) - { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = headers - .get(ReqwestHeaders::RETRY_AFTER) - .and_then(|x| x.to_str().ok()) - { - rl.update_from_retry_after(retry_after); - } else if response.status() == StatusCode::TOO_MANY_REQUESTS { - rl.update_from_429(); - } + if let Some(sentry_header) = headers + .get("x-sentry-rate-limits") + .and_then(|x| x.to_str().ok()) + { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = headers + .get(ReqwestHeaders::RETRY_AFTER) + .and_then(|x| x.to_str().ok()) + { + rl.update_from_retry_after(retry_after); + } else if response.status() == StatusCode::TOO_MANY_REQUESTS { + rl.update_from_429(); + } - match response.text().await { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); - } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); - } + match response.text().await { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); } - } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); } } - rl + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); + } } - }, - ); + rl + } + }); Self { thread } } } diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index 019cdf815..938e4d14a 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -83,45 +83,39 @@ impl UreqHttpTransport { let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let thread = TransportThread::new( - options, - move |envelope, rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); - - match request { - Ok(mut response) => { - fn header_str<'a, B>( - response: &'a Response, - key: &str, - ) -> Option<&'a str> { - response.headers().get(key)?.to_str().ok() - } + let thread = TransportThread::new(options, move |envelope, rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); + + match request { + Ok(mut response) => { + fn header_str<'a, B>(response: &'a Response, key: &str) -> Option<&'a str> { + response.headers().get(key)?.to_str().ok() + } - if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = header_str(&response, "retry-after") { - rl.update_from_retry_after(retry_after); - } else if response.status() == 429 { - rl.update_from_429(); - } + if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = header_str(&response, "retry-after") { + rl.update_from_retry_after(retry_after); + } else if response.status() == 429 { + rl.update_from_429(); + } - match response.body_mut().read_to_string() { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); - } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); - } + match response.body_mut().read_to_string() { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } } - }, - ); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); + } + } + }); Self { thread } } }