From 8169b3e5fccbff9cd51828b25f708500cd172adf Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 1/3] docs(http/retry): document `PeekTrailersBody` interfaces this is a squashed commit containing the following: --- docs(http/retry): document `PeekTrailersBody::inner` Signed-off-by: katelyn martin docs(http/retry): document `PeekTrailersBody::peek_trailers()` Signed-off-by: katelyn martin docs(http/retry): document `PeekTrailersBody::no_trailers()` Signed-off-by: katelyn martin --- linkerd/http/retry/src/peek_trailers.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index 868857b71f..a2a362d537 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -16,6 +16,9 @@ use std::{ /// If the first frame of the body stream was *not* a `TRAILERS` frame, this /// behaves identically to a normal body. pub struct PeekTrailersBody { + /// The inner [`Body`]. + /// + /// This is the request or response body whose trailers are being peeked. inner: B, /// The first DATA frame received from the inner body, or an error that @@ -46,6 +49,10 @@ pub type WithPeekTrailersBody = Either< // === impl WithTrailers === impl PeekTrailersBody { + /// Returns a reference to the body's trailers, if available. + /// + /// This function will return `None` if the body's trailers could not be peeked, or if there + /// were no trailers included. pub fn peek_trailers(&self) -> Option<&http::HeaderMap> { self.trailers .as_ref() @@ -116,6 +123,9 @@ impl PeekTrailersBody { http::Response::from_parts(parts, body) } + /// Returns a response with an inert [`PeekTrailersBody`]. + /// + /// This will not peek the inner body's trailers. fn no_trailers(rsp: http::Response) -> http::Response { rsp.map(|inner| Self { inner, From 89497b4c24b133d37ce68ec919b79fd73c4e2f45 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 2/3] refactor(http/retry): refactor `PeekTrailersBody` logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit this is a squashed commit containing the following: --- refactor(http/retry): decompose `WithPeekTrailersBody` type alias this commit breaks this large type out into two halves. this is a purely cosmetic change. Signed-off-by: katelyn martin refactor(http/retry): `PeekTrailersBody` is pin projected we must pass our `Pin`'edness down to the inner `B`-typed body for `PeekTrailersBody` to itself implement `http_body::Body`. this commit tweaks the existing code to rely on the `pin-project` library. this generates a `project()` method to pin inner fields whose `poll_data()` and `poll_trailers()` functions we delegate to. this is a noöp change. Signed-off-by: katelyn martin refactor(http/retry): defer construction of `PeekTrailersBody` this commit refactors the polling logic in `PeekTrailersBody::read_response`. this commit makes some subtle changes with the migration to hyper 1.0 in mind, to make this function more easily portable to the new signature of `http_body::Body`. see https://github.com/linkerd/linkerd2/issues/8733 for more information. this commit defers the `Self` construction of the `PeekTrailersBody` body. this means that the control flow does not need to reach through to e.g. `body.inner` to poll the inner body being peeked. additionally, it provides us with `let` bindings for the first data frame yielded, and the trailers frame yielded thereafter. this is largely cosmetic, but will make it easier to deal with the additional matching we'll need to do when there is a single polling function that yields us `Frame` objects. Signed-off-by: katelyn martin refactor(http/retry): `PeekTrailersBody` transforms `B` this is a small structural change to the `PeekTrailersBody::read_response()` function to facilitate writing some unit tests. rather than transforming a `Response` into a `Response>`, we hoist the `Response::into_parts()` and `Response::from_parts()` calls up. `read_response()` is renamed to `read_body()`. Signed-off-by: katelyn martin --- Cargo.lock | 1 + linkerd/http/retry/Cargo.toml | 1 + linkerd/http/retry/src/peek_trailers.rs | 97 +++++++++++++++---------- 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a855ad89a..b5dbbdb225 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1806,6 +1806,7 @@ dependencies = [ "linkerd-stack", "linkerd-tracing", "parking_lot", + "pin-project", "thiserror 2.0.11", "tokio", "tower", diff --git a/linkerd/http/retry/Cargo.toml b/linkerd/http/retry/Cargo.toml index a59da0d9ab..09e15bd356 100644 --- a/linkerd/http/retry/Cargo.toml +++ b/linkerd/http/retry/Cargo.toml @@ -12,6 +12,7 @@ futures = { version = "0.3", default-features = false } http-body = { workspace = true } http = { workspace = true } parking_lot = "0.12" +pin-project = "1" tokio = { version = "1", features = ["macros", "rt"] } tower = { version = "0.4", features = ["retry"] } tracing = "0.1" diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index a2a362d537..c221fa4e83 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -4,6 +4,7 @@ use futures::{ }; use http_body::Body; use linkerd_http_box::BoxBody; +use pin_project::pin_project; use std::{ future::Future, pin::Pin, @@ -15,10 +16,12 @@ use std::{ /// /// If the first frame of the body stream was *not* a `TRAILERS` frame, this /// behaves identically to a normal body. +#[pin_project(project = Projection)] pub struct PeekTrailersBody { /// The inner [`Body`]. /// /// This is the request or response body whose trailers are being peeked. + #[pin] inner: B, /// The first DATA frame received from the inner body, or an error that @@ -41,10 +44,13 @@ pub struct PeekTrailersBody { trailers: Option, B::Error>>, } -pub type WithPeekTrailersBody = Either< - futures::future::Ready>>, - Pin>> + Send + 'static>>, ->; +/// A future that yields a response instrumented with [`PeekTrailersBody`]. +pub type WithPeekTrailersBody = Either, ReadingResponse>; +/// A future that immediately yields a response. +type ReadyResponse = future::Ready>>; +/// A boxed future that must poll a body before yielding a response. +type ReadingResponse = + Pin>> + Send + 'static>>; // === impl WithTrailers === @@ -81,46 +87,51 @@ impl PeekTrailersBody { } // Otherwise, return a future that tries to read the next frame. - Either::Right(Box::pin(Self::read_response(rsp))) + Either::Right(Box::pin(async move { + let (parts, body) = rsp.into_parts(); + let body = Self::read_body(body).await; + http::Response::from_parts(parts, body) + })) } - async fn read_response(rsp: http::Response) -> http::Response + async fn read_body(mut body: B) -> Self where B: Send + Unpin, B::Data: Send + Unpin, B::Error: Send, { - let (parts, body) = rsp.into_parts(); - let mut body = Self { - inner: body, - first_data: None, - trailers: None, - }; - + // First, poll the body for its first frame. tracing::debug!("Buffering first data frame"); - if let Some(data) = body.inner.data().await { - // The body has data; stop waiting for trailers. - body.first_data = Some(data); - - // Peek to see if there's immediately a trailers frame, and grab - // it if so. Otherwise, bail. - // XXX(eliza): the documentation for the `http::Body` trait says - // that `poll_trailers` should only be called after `poll_data` - // returns `None`...but, in practice, I'm fairly sure that this just - // means that it *will not return `Ready`* until there are no data - // frames left, which is fine for us here, because we `now_or_never` - // it. - body.trailers = body.inner.trailers().now_or_never(); + let first_data = body.data().await; + + // Now, inspect the frame yielded. If the body yielded a data frame, we will only peek + // the trailers if they are immediately available. If the body did not yield a data frame, + // we will poll a future to yield the trailers. + let trailers = if first_data.is_some() { + // The body has data; stop waiting for trailers. Peek to see if there's immediately a + // trailers frame, and grab it if so. Otherwise, bail. + // + // XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers` + // should only be called after `poll_data` returns `None`...but, in practice, I'm + // fairly sure that this just means that it *will not return `Ready`* until there are + // no data frames left, which is fine for us here, because we `now_or_never` it. + body.trailers().now_or_never() } else { - // Okay, `poll_data` has returned `None`, so there are no data - // frames left. Let's see if there's trailers... - body.trailers = Some(body.inner.trailers().await); - } - if body.trailers.is_some() { + // Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see + // if there's trailers... + let trls = body.trailers().await; + Some(trls) + }; + + if trailers.is_some() { tracing::debug!("Buffered trailers frame"); } - http::Response::from_parts(parts, body) + Self { + inner: body, + first_data, + trailers, + } } /// Returns a response with an inert [`PeekTrailersBody`]. @@ -148,24 +159,34 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let this = self.get_mut(); - if let Some(first_data) = this.first_data.take() { + let Projection { + inner, + first_data, + trailers: _, + } = self.project(); + + if let Some(first_data) = first_data.take() { return Poll::Ready(Some(first_data)); } - Pin::new(&mut this.inner).poll_data(cx) + inner.poll_data(cx) } fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let this = self.get_mut(); - if let Some(trailers) = this.trailers.take() { + let Projection { + inner, + first_data: _, + trailers, + } = self.project(); + + if let Some(trailers) = trailers.take() { return Poll::Ready(trailers); } - Pin::new(&mut this.inner).poll_trailers(cx) + inner.poll_trailers(cx) } #[inline] From 9be9042748cb4048a1f24103b77e80d41997a1f2 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 3/3] feat(http/retry): unit test suite for `PeekTrailersBody` `PeekTrailersBody` contains some subtle edge cases related to the number of DATA frames yielded by the inner body, and how persistent it will be about polling for TRAILERS frames. for example, if it yields a DATA frame, it will not await trailers being available, but it *will* do so if the inner body does not yield a DATA frame. if a DATA frame is yielded, it will check for a TRAILERS frame, but it must be immmediately available. this is all subtle, and particularly subject to change with the upgrade to http-body 1.0's frame-oriented `Body` interface. so, this commit introduces a test suite for `PeekTrailersBody`. it includes assertions to confirm when the peek middleware can and cannot observe the trailers. some `TODO(kate)` comments are left where issues exist. Signed-off-by: katelyn martin --- linkerd/http/retry/src/peek_trailers.rs | 182 ++++++++++++++++++++++++ 1 file changed, 182 insertions(+) diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index c221fa4e83..fd1a8a3589 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -212,3 +212,185 @@ where hint } } + +#[cfg(test)] +mod tests { + use super::PeekTrailersBody; + use bytes::Bytes; + use http::{HeaderMap, HeaderValue}; + use http_body::Body; + use linkerd_error::Error; + use std::{ + collections::VecDeque, + ops::Not, + pin::Pin, + task::{Context, Poll}, + }; + + /// A "mock" body. + /// + /// This type contains polling results for [`Body`]. + #[derive(Default)] + struct MockBody { + data_polls: VecDeque>>>, + trailer_polls: VecDeque, Error>>>, + } + + fn data() -> Option> { + let bytes = Bytes::from_static(b"hello"); + Some(Ok(bytes)) + } + + fn trailers() -> Result, Error> { + let mut trls = HeaderMap::with_capacity(1); + let value = HeaderValue::from_static("shiny"); + trls.insert("trailer", value); + Ok(Some(trls)) + } + + #[tokio::test] + async fn cannot_peek_empty() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let empty = MockBody::default(); + let peek = PeekTrailersBody::read_body(empty).await; + assert!(peek.peek_trailers().is_none()); + // TODO(kate): this will not return `true`. + // assert!(peek.is_end_stream()); + } + + #[tokio::test] + async fn peeks_only_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let only_trailers = MockBody::default().then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(only_trailers).await; + assert!(peek.peek_trailers().is_some()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn peeks_one_frame_with_immediate_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_some()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn cannot_peek_one_frame_with_eventual_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Pending) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_none()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn peeks_one_eventual_frame_with_immediate_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Pending) + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_some()); + assert!(peek.is_end_stream().not()); + } + + #[tokio::test] + async fn cannot_peek_two_frames_with_immediate_trailers() { + let (_guard, _handle) = linkerd_tracing::test::trace_init(); + let body = MockBody::default() + .then_yield_data(Poll::Ready(data())) + .then_yield_data(Poll::Ready(data())) + .then_yield_trailer(Poll::Ready(trailers())); + let peek = PeekTrailersBody::read_body(body).await; + assert!(peek.peek_trailers().is_none()); + assert!(peek.is_end_stream().not()); + } + + // === impl MockBody === + + impl MockBody { + /// Appends a poll outcome for [`Body::poll_data()`]. + fn then_yield_data(mut self, poll: Poll>>) -> Self { + self.data_polls.push_back(poll); + self + } + + /// Appends a poll outcome for [`Body::poll_trailers()`]. + fn then_yield_trailer( + mut self, + poll: Poll, Error>>, + ) -> Self { + self.trailer_polls.push_back(poll); + self + } + + /// Schedules a task to be awoken. + fn schedule(cx: &Context<'_>) { + let waker = cx.waker().clone(); + tokio::spawn(async move { + waker.wake(); + }); + } + } + + impl Body for MockBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let poll = self + .get_mut() + .data_polls + .pop_front() + .unwrap_or(Poll::Ready(None)); + // If we return `Poll::Pending`, we must schedule the task to be awoken. + if poll.is_pending() { + Self::schedule(cx); + } + poll + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let Self { + data_polls, + trailer_polls, + } = self.get_mut(); + + let poll = if data_polls.is_empty() { + trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None))) + } else { + // If the data frames have not all been yielded, yield `Pending`. + // + // TODO(kate): this arm should panic. it indicates `PeekTrailersBody` isn't + // respecting the contract outlined in + // . + Poll::Pending + }; + + // If we return `Poll::Pending`, we must schedule the task to be awoken. + if poll.is_pending() { + Self::schedule(cx); + } + + poll + } + + fn is_end_stream(&self) -> bool { + self.data_polls.is_empty() && self.trailer_polls.is_empty() + } + } +}