Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,7 @@ dependencies = [
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"pin-project",
"thiserror 2.0.11",
"tokio",
"tower",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ futures = { version = "0.3", default-features = false }
http-body = { workspace = true }
http = { workspace = true }
parking_lot = "0.12"
pin-project = "1"
tokio = { version = "1", features = ["macros", "rt"] }
tower = { version = "0.4", features = ["retry"] }
tracing = "0.1"
Expand Down
289 changes: 251 additions & 38 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::{
};
use http_body::Body;
use linkerd_http_box::BoxBody;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
Expand All @@ -15,7 +16,12 @@ use std::{
///
/// If the first frame of the body stream was *not* a `TRAILERS` frame, this
/// behaves identically to a normal body.
#[pin_project(project = Projection)]
pub struct PeekTrailersBody<B: Body = BoxBody> {
/// The inner [`Body`].
///
/// This is the request or response body whose trailers are being peeked.
#[pin]
inner: B,

/// The first DATA frame received from the inner body, or an error that
Expand All @@ -38,14 +44,21 @@ pub struct PeekTrailersBody<B: Body = BoxBody> {
trailers: Option<Result<Option<http::HeaderMap>, B::Error>>,
}

pub type WithPeekTrailersBody<B> = Either<
futures::future::Ready<http::Response<PeekTrailersBody<B>>>,
Pin<Box<dyn Future<Output = http::Response<PeekTrailersBody<B>>> + Send + 'static>>,
>;
/// A future that yields a response instrumented with [`PeekTrailersBody<B>`].
pub type WithPeekTrailersBody<B> = Either<ReadyResponse<B>, ReadingResponse<B>>;
/// A future that immediately yields a response.
type ReadyResponse<B> = future::Ready<http::Response<PeekTrailersBody<B>>>;
/// A boxed future that must poll a body before yielding a response.
type ReadingResponse<B> =
Pin<Box<dyn Future<Output = http::Response<PeekTrailersBody<B>>> + Send + 'static>>;

// === impl WithTrailers ===

impl<B: Body> PeekTrailersBody<B> {
/// Returns a reference to the body's trailers, if available.
///
/// This function will return `None` if the body's trailers could not be peeked, or if there
/// were no trailers included.
pub fn peek_trailers(&self) -> Option<&http::HeaderMap> {
self.trailers
.as_ref()
Expand Down Expand Up @@ -74,48 +87,56 @@ impl<B: Body> PeekTrailersBody<B> {
}

// Otherwise, return a future that tries to read the next frame.
Either::Right(Box::pin(Self::read_response(rsp)))
Either::Right(Box::pin(async move {
let (parts, body) = rsp.into_parts();
let body = Self::read_body(body).await;
http::Response::from_parts(parts, body)
}))
}

async fn read_response(rsp: http::Response<B>) -> http::Response<Self>
async fn read_body(mut body: B) -> Self
where
B: Send + Unpin,
B::Data: Send + Unpin,
B::Error: Send,
{
let (parts, body) = rsp.into_parts();
let mut body = Self {
inner: body,
first_data: None,
trailers: None,
};

// First, poll the body for its first frame.
tracing::debug!("Buffering first data frame");
if let Some(data) = body.inner.data().await {
// The body has data; stop waiting for trailers.
body.first_data = Some(data);

// Peek to see if there's immediately a trailers frame, and grab
// it if so. Otherwise, bail.
// XXX(eliza): the documentation for the `http::Body` trait says
// that `poll_trailers` should only be called after `poll_data`
// returns `None`...but, in practice, I'm fairly sure that this just
// means that it *will not return `Ready`* until there are no data
// frames left, which is fine for us here, because we `now_or_never`
// it.
body.trailers = body.inner.trailers().now_or_never();
let first_data = body.data().await;

// Now, inspect the frame yielded. If the body yielded a data frame, we will only peek
// the trailers if they are immediately available. If the body did not yield a data frame,
// we will poll a future to yield the trailers.
let trailers = if first_data.is_some() {
// The body has data; stop waiting for trailers. Peek to see if there's immediately a
// trailers frame, and grab it if so. Otherwise, bail.
//
// XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers`
// should only be called after `poll_data` returns `None`...but, in practice, I'm
// fairly sure that this just means that it *will not return `Ready`* until there are
// no data frames left, which is fine for us here, because we `now_or_never` it.
body.trailers().now_or_never()
} else {
// Okay, `poll_data` has returned `None`, so there are no data
// frames left. Let's see if there's trailers...
body.trailers = Some(body.inner.trailers().await);
}
if body.trailers.is_some() {
// Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see
// if there's trailers...
let trls = body.trailers().await;
Some(trls)
};

if trailers.is_some() {
tracing::debug!("Buffered trailers frame");
}

http::Response::from_parts(parts, body)
Self {
inner: body,
first_data,
trailers,
}
}

/// Returns a response with an inert [`PeekTrailersBody<B>`].
///
/// This will not peek the inner body's trailers.
fn no_trailers(rsp: http::Response<B>) -> http::Response<Self> {
rsp.map(|inner| Self {
inner,
Expand All @@ -138,24 +159,34 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.get_mut();
if let Some(first_data) = this.first_data.take() {
let Projection {
inner,
first_data,
trailers: _,
} = self.project();

if let Some(first_data) = first_data.take() {
return Poll::Ready(Some(first_data));
}

Pin::new(&mut this.inner).poll_data(cx)
inner.poll_data(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.get_mut();
if let Some(trailers) = this.trailers.take() {
let Projection {
inner,
first_data: _,
trailers,
} = self.project();

if let Some(trailers) = trailers.take() {
return Poll::Ready(trailers);
}

Pin::new(&mut this.inner).poll_trailers(cx)
inner.poll_trailers(cx)
}

#[inline]
Expand All @@ -181,3 +212,185 @@ where
hint
}
}

#[cfg(test)]
mod tests {
use super::PeekTrailersBody;
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::Body;
use linkerd_error::Error;
use std::{
collections::VecDeque,
ops::Not,
pin::Pin,
task::{Context, Poll},
};

/// A "mock" body.
///
/// This type contains polling results for [`Body`].
#[derive(Default)]
struct MockBody {
data_polls: VecDeque<Poll<Option<Result<Bytes, Error>>>>,
trailer_polls: VecDeque<Poll<Result<Option<http::HeaderMap>, Error>>>,
}

fn data() -> Option<Result<Bytes, Error>> {
let bytes = Bytes::from_static(b"hello");
Some(Ok(bytes))
}

fn trailers() -> Result<Option<http::HeaderMap>, Error> {
let mut trls = HeaderMap::with_capacity(1);
let value = HeaderValue::from_static("shiny");
trls.insert("trailer", value);
Ok(Some(trls))
}

#[tokio::test]
async fn cannot_peek_empty() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let empty = MockBody::default();
let peek = PeekTrailersBody::read_body(empty).await;
assert!(peek.peek_trailers().is_none());
// TODO(kate): this will not return `true`.
// assert!(peek.is_end_stream());
}

#[tokio::test]
async fn peeks_only_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let only_trailers = MockBody::default().then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(only_trailers).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn peeks_one_frame_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn cannot_peek_one_frame_with_eventual_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Pending)
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_none());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn peeks_one_eventual_frame_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Pending)
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn cannot_peek_two_frames_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_none());
assert!(peek.is_end_stream().not());
}

// === impl MockBody ===

impl MockBody {
/// Appends a poll outcome for [`Body::poll_data()`].
fn then_yield_data(mut self, poll: Poll<Option<Result<Bytes, Error>>>) -> Self {
self.data_polls.push_back(poll);
self
}

/// Appends a poll outcome for [`Body::poll_trailers()`].
fn then_yield_trailer(
mut self,
poll: Poll<Result<Option<http::HeaderMap>, Error>>,
) -> Self {
self.trailer_polls.push_back(poll);
self
}

/// Schedules a task to be awoken.
fn schedule(cx: &Context<'_>) {
let waker = cx.waker().clone();
tokio::spawn(async move {
waker.wake();
});
}
}

impl Body for MockBody {
type Data = Bytes;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let poll = self
.get_mut()
.data_polls
.pop_front()
.unwrap_or(Poll::Ready(None));
// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}
poll
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let Self {
data_polls,
trailer_polls,
} = self.get_mut();

let poll = if data_polls.is_empty() {
trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None)))
} else {
// If the data frames have not all been yielded, yield `Pending`.
//
// TODO(kate): this arm should panic. it indicates `PeekTrailersBody<B>` isn't
// respecting the contract outlined in
// <https://docs.rs/http-body/0.4.6/http_body/trait.Body.html#tymethod.poll_trailers>.
Poll::Pending
};

// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}

poll
}

fn is_end_stream(&self) -> bool {
self.data_polls.is_empty() && self.trailer_polls.is_empty()
}
}
}
Loading