diff --git a/Cargo.lock b/Cargo.lock index 5a855ad89a..b5dbbdb225 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1806,6 +1806,7 @@ dependencies = [ "linkerd-stack", "linkerd-tracing", "parking_lot", + "pin-project", "thiserror 2.0.11", "tokio", "tower", diff --git a/linkerd/http/retry/Cargo.toml b/linkerd/http/retry/Cargo.toml index a59da0d9ab..09e15bd356 100644 --- a/linkerd/http/retry/Cargo.toml +++ b/linkerd/http/retry/Cargo.toml @@ -12,6 +12,7 @@ futures = { version = "0.3", default-features = false } http-body = { workspace = true } http = { workspace = true } parking_lot = "0.12" +pin-project = "1" tokio = { version = "1", features = ["macros", "rt"] } tower = { version = "0.4", features = ["retry"] } tracing = "0.1" diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index 868857b71f..fd1a8a3589 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -4,6 +4,7 @@ use futures::{ }; use http_body::Body; use linkerd_http_box::BoxBody; +use pin_project::pin_project; use std::{ future::Future, pin::Pin, @@ -15,7 +16,12 @@ use std::{ /// /// If the first frame of the body stream was *not* a `TRAILERS` frame, this /// behaves identically to a normal body. +#[pin_project(project = Projection)] pub struct PeekTrailersBody { + /// The inner [`Body`]. + /// + /// This is the request or response body whose trailers are being peeked. + #[pin] inner: B, /// The first DATA frame received from the inner body, or an error that @@ -38,14 +44,21 @@ pub struct PeekTrailersBody { trailers: Option, B::Error>>, } -pub type WithPeekTrailersBody = Either< - futures::future::Ready>>, - Pin>> + Send + 'static>>, ->; +/// A future that yields a response instrumented with [`PeekTrailersBody`]. +pub type WithPeekTrailersBody = Either, ReadingResponse>; +/// A future that immediately yields a response. +type ReadyResponse = future::Ready>>; +/// A boxed future that must poll a body before yielding a response. +type ReadingResponse = + Pin>> + Send + 'static>>; // === impl WithTrailers === impl PeekTrailersBody { + /// Returns a reference to the body's trailers, if available. + /// + /// This function will return `None` if the body's trailers could not be peeked, or if there + /// were no trailers included. pub fn peek_trailers(&self) -> Option<&http::HeaderMap> { self.trailers .as_ref() @@ -74,48 +87,56 @@ impl PeekTrailersBody { } // Otherwise, return a future that tries to read the next frame. - Either::Right(Box::pin(Self::read_response(rsp))) + Either::Right(Box::pin(async move { + let (parts, body) = rsp.into_parts(); + let body = Self::read_body(body).await; + http::Response::from_parts(parts, body) + })) } - async fn read_response(rsp: http::Response) -> http::Response + async fn read_body(mut body: B) -> Self where B: Send + Unpin, B::Data: Send + Unpin, B::Error: Send, { - let (parts, body) = rsp.into_parts(); - let mut body = Self { - inner: body, - first_data: None, - trailers: None, - }; - + // First, poll the body for its first frame. tracing::debug!("Buffering first data frame"); - if let Some(data) = body.inner.data().await { - // The body has data; stop waiting for trailers. - body.first_data = Some(data); - - // Peek to see if there's immediately a trailers frame, and grab - // it if so. Otherwise, bail. - // XXX(eliza): the documentation for the `http::Body` trait says - // that `poll_trailers` should only be called after `poll_data` - // returns `None`...but, in practice, I'm fairly sure that this just - // means that it *will not return `Ready`* until there are no data - // frames left, which is fine for us here, because we `now_or_never` - // it. - body.trailers = body.inner.trailers().now_or_never(); + let first_data = body.data().await; + + // Now, inspect the frame yielded. If the body yielded a data frame, we will only peek + // the trailers if they are immediately available. If the body did not yield a data frame, + // we will poll a future to yield the trailers. + let trailers = if first_data.is_some() { + // The body has data; stop waiting for trailers. Peek to see if there's immediately a + // trailers frame, and grab it if so. Otherwise, bail. + // + // XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers` + // should only be called after `poll_data` returns `None`...but, in practice, I'm + // fairly sure that this just means that it *will not return `Ready`* until there are + // no data frames left, which is fine for us here, because we `now_or_never` it. + body.trailers().now_or_never() } else { - // Okay, `poll_data` has returned `None`, so there are no data - // frames left. Let's see if there's trailers... - body.trailers = Some(body.inner.trailers().await); - } - if body.trailers.is_some() { + // Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see + // if there's trailers... + let trls = body.trailers().await; + Some(trls) + }; + + if trailers.is_some() { tracing::debug!("Buffered trailers frame"); } - http::Response::from_parts(parts, body) + Self { + inner: body, + first_data, + trailers, + } } + /// Returns a response with an inert [`PeekTrailersBody`]. + /// + /// This will not peek the inner body's trailers. fn no_trailers(rsp: http::Response) -> http::Response { rsp.map(|inner| Self { inner, @@ -138,24 +159,34 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let this = self.get_mut(); - if let Some(first_data) = this.first_data.take() { + let Projection { + inner, + first_data, + trailers: _, + } = self.project(); + + if let Some(first_data) = first_data.take() { return Poll::Ready(Some(first_data)); } - Pin::new(&mut this.inner).poll_data(cx) + inner.poll_data(cx) } fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let this = self.get_mut(); - if let Some(trailers) = this.trailers.take() { + let Projection { + inner, + first_data: _, + trailers, + } = self.project(); + + if let Some(trailers) = trailers.take() { return Poll::Ready(trailers); } - Pin::new(&mut this.inner).poll_trailers(cx) + inner.poll_trailers(cx) } #[inline] @@ -181,3 +212,185 @@ where hint } } + +#[cfg(test)] +mod tests { + use super::PeekTrailersBody; + use bytes::Bytes; + use http::{HeaderMap, HeaderValue}; + use http_body::Body; + use linkerd_error::Error; + use std::{ + collections::VecDeque, + ops::Not, + pin::Pin, + task::{Context, Poll}, + }; + + /// A "mock" body. + /// + /// This type contains polling results for [`Body`]. + #[derive(Default)] + struct MockBody { + data_polls: VecDeque>>>, + trailer_polls: VecDeque, Error>>>, + } + + fn data() -> Option> { + let bytes = Bytes::from_static(b"hello"); + Some(Ok(bytes)) + } + + fn trailers() -> Result, Error> { + let mut trls = HeaderMap::with_capacity(1); + let value = HeaderValue::from_static("shiny"); + trls.insert("trailer", value); + Ok(Some(trls)) + } + + #[tokio::test] + async fn cannot_peek_empty() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let empty = MockBody::default(); + let peek = PeekTrailersBody::read_body(empty).await; + assert!(peek.peek_trailers().is_none()); + // TODO(kate): this will not return `true`. + // assert!(peek.is_end_stream()); + } + + #[tokio::test] + async fn peeks_only_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let only_trailers = MockBody::default().then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(only_trailers).await; + assert!(peek.peek_trailers().is_some()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn peeks_one_frame_with_immediate_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_some()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn cannot_peek_one_frame_with_eventual_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Pending) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_none()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn peeks_one_eventual_frame_with_immediate_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Pending) + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_some()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn cannot_peek_two_frames_with_immediate_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Ready(data())) + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_none()); + assert!(peek.is_end_stream().not()); + } + + // === impl MockBody === + + impl MockBody { + /// Appends a poll outcome for [`Body::poll_data()`]. + fn then_yield_data(mut self, poll: Poll>>) -> Self { + self.data_polls.push_back(poll); + self + } + + /// Appends a poll outcome for [`Body::poll_trailers()`]. + fn then_yield_trailer( + mut self, + poll: Poll, Error>>, + ) -> Self { + self.trailer_polls.push_back(poll); + self + } + + /// Schedules a task to be awoken. + fn schedule(cx: &Context<'_>) { + let waker = cx.waker().clone(); + tokio::spawn(async move { + waker.wake(); + }); + } + } + + impl Body for MockBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let poll = self + .get_mut() + .data_polls + .pop_front() + .unwrap_or(Poll::Ready(None)); + // If we return `Poll::Pending`, we must schedule the task to be awoken. + if poll.is_pending() { + Self::schedule(cx); + } + poll + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let Self { + data_polls, + trailer_polls, + } = self.get_mut(); + + let poll = if data_polls.is_empty() { + trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None))) + } else { + // If the data frames have not all been yielded, yield `Pending`. + // + // TODO(kate): this arm should panic. it indicates `PeekTrailersBody` isn't + // respecting the contract outlined in + // . + Poll::Pending + }; + + // If we return `Poll::Pending`, we must schedule the task to be awoken. + if poll.is_pending() { + Self::schedule(cx); + } + + poll + } + + fn is_end_stream(&self) -> bool { + self.data_polls.is_empty() && self.trailer_polls.is_empty() + } + } +}