Skip to content

Commit ba9231d

Browse files
committed
refactor(app/core): prepare rescue body for http-body upgrade
this commit makes some minor alterations to our error recovery body middleware. see linkerd/linkerd2#8733 for more information. this commit removes an `assert!` statement from the implementation of `<Response<R, B> as Body>::poll_data()`. see the documentation of `Body::poll_frame()`: > Once the end of the stream is reached, implementations should > continue to return [`Poll::Ready(None)`]. hyperium/http-body@1090bff#diff-33aabe8c2aaa7614022addf244245e09bbff576a67a9ae3c6938c8a868201d36R60-R61 to do this, this commit introduces a distinct terminal state `Inner::Rescued` to represent when the underlying `B`-typed body has yielded an error and been rescued. once in this state the body will yield no more data frames, instead yielding a collection of trailers describing the mid-stream error that was encountered by the underlying body. the call to `R::rescue` is also moved down into the helper function fka `grpc_trailers()`. this helps the function follow the grain of our "state machine" a little more directly. see #3615, #3614, and #3611 for pretext to this change.
1 parent d2425c8 commit ba9231d

File tree

1 file changed

+61
-37
lines changed

1 file changed

+61
-37
lines changed

linkerd/app/core/src/errors/body.rs

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::{
22
header::{GRPC_MESSAGE, GRPC_STATUS},
33
respond::{HttpRescue, SyntheticHttpResponse},
44
};
5-
use http::header::HeaderValue;
5+
use http::{header::HeaderValue, HeaderMap};
66
use linkerd_error::{Error, Result};
77
use pin_project::pin_project;
88
use std::{
@@ -20,14 +20,18 @@ pub struct ResponseBody<R, B>(#[pin] Inner<R, B>);
2020

2121
#[pin_project(project = InnerProj)]
2222
enum Inner<R, B> {
23+
/// An inert body that delegates directly down to the underlying body `B`.
2324
Passthru(#[pin] B),
25+
/// A body that will be rescued if it yields an error.
2426
GrpcRescue {
2527
#[pin]
2628
inner: B,
27-
trailers: Option<http::HeaderMap>,
29+
/// An error response [strategy][HttpRescue].
2830
rescue: R,
2931
emit_headers: bool,
3032
},
33+
/// The underlying body `B` yielded an error and was "rescued".
34+
Rescued { trailers: Option<http::HeaderMap> },
3135
}
3236

3337
// === impl ResponseBody ===
@@ -44,7 +48,6 @@ impl<R, B> ResponseBody<R, B> {
4448
inner,
4549
rescue,
4650
emit_headers,
47-
trailers: None,
4851
})
4952
}
5053
}
@@ -64,34 +67,27 @@ where
6467
type Error = B::Error;
6568

6669
fn poll_data(
67-
self: Pin<&mut Self>,
70+
mut self: Pin<&mut Self>,
6871
cx: &mut Context<'_>,
6972
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
70-
let ResponseBodyProj(inner) = self.project();
73+
let ResponseBodyProj(inner) = self.as_mut().project();
7174
match inner.project() {
7275
InnerProj::Passthru(inner) => inner.poll_data(cx),
76+
InnerProj::Rescued { trailers: _ } => Poll::Ready(None),
7377
InnerProj::GrpcRescue {
7478
inner,
75-
trailers,
7679
rescue,
7780
emit_headers,
78-
} => {
79-
// should not be calling poll_data if we have set trailers derived from an error
80-
assert!(trailers.is_none());
81-
match inner.poll_data(cx) {
82-
Poll::Ready(Some(Err(error))) => {
83-
let SyntheticHttpResponse {
84-
grpc_status,
85-
message,
86-
..
87-
} = rescue.rescue(error)?;
88-
let t = Self::grpc_trailers(grpc_status, &message, *emit_headers);
89-
*trailers = Some(t);
90-
Poll::Ready(None)
91-
}
92-
data => data,
81+
} => match inner.poll_data(cx) {
82+
Poll::Ready(Some(Err(error))) => {
83+
// The inner body has yielded an error, which we will try to rescue. If so,
84+
// store our synthetic trailers reporting the error.
85+
let trailers = Self::rescue(error, rescue, *emit_headers)?;
86+
self.set_rescued(trailers);
87+
Poll::Ready(None)
9388
}
94-
}
89+
data => data,
90+
},
9591
}
9692
}
9793

@@ -103,12 +99,8 @@ where
10399
let ResponseBodyProj(inner) = self.project();
104100
match inner.project() {
105101
InnerProj::Passthru(inner) => inner.poll_trailers(cx),
106-
InnerProj::GrpcRescue {
107-
inner, trailers, ..
108-
} => match trailers.take() {
109-
Some(t) => Poll::Ready(Ok(Some(t))),
110-
None => inner.poll_trailers(cx),
111-
},
102+
InnerProj::GrpcRescue { inner, .. } => inner.poll_trailers(cx),
103+
InnerProj::Rescued { trailers } => Poll::Ready(Ok(trailers.take())),
112104
}
113105
}
114106

@@ -117,9 +109,8 @@ where
117109
let Self(inner) = self;
118110
match inner {
119111
Inner::Passthru(inner) => inner.is_end_stream(),
120-
Inner::GrpcRescue {
121-
inner, trailers, ..
122-
} => trailers.is_none() && inner.is_end_stream(),
112+
Inner::GrpcRescue { inner, .. } => inner.is_end_stream(),
113+
Inner::Rescued { trailers } => trailers.is_none(),
123114
}
124115
}
125116

@@ -129,25 +120,58 @@ where
129120
match inner {
130121
Inner::Passthru(inner) => inner.size_hint(),
131122
Inner::GrpcRescue { inner, .. } => inner.size_hint(),
123+
Inner::Rescued { .. } => http_body::SizeHint::with_exact(0),
132124
}
133125
}
134126
}
135127

136-
impl<R, B> ResponseBody<R, B> {
137-
fn grpc_trailers(code: tonic::Code, message: &str, emit_headers: bool) -> http::HeaderMap {
138-
debug!(grpc.status = ?code, "Synthesizing gRPC trailers");
128+
impl<R, B> ResponseBody<R, B>
129+
where
130+
B: http_body::Body,
131+
R: HttpRescue<B::Error>,
132+
{
133+
/// Maps an error yielded by the inner body to a collection of gRPC trailers.
134+
///
135+
/// This function returns `Ok(trailers)` if the given [`HttpRescue<E>`] strategy could identify
136+
/// a cause for an error yielded by the inner `B`-typed body.
137+
fn rescue(
138+
error: B::Error,
139+
rescue: &R,
140+
emit_headers: bool,
141+
) -> Result<http::HeaderMap, B::Error> {
142+
let SyntheticHttpResponse {
143+
grpc_status,
144+
message,
145+
..
146+
} = rescue.rescue(error)?;
147+
148+
debug!(grpc.status = ?grpc_status, "Synthesizing gRPC trailers");
139149
let mut t = http::HeaderMap::new();
140-
t.insert(GRPC_STATUS, super::code_header(code));
150+
t.insert(GRPC_STATUS, super::code_header(grpc_status));
141151
if emit_headers {
152+
// A gRPC message trailer is only included if instructed to emit additional headers.
142153
t.insert(
143154
GRPC_MESSAGE,
144-
HeaderValue::from_str(message).unwrap_or_else(|error| {
155+
HeaderValue::from_str(&message).unwrap_or_else(|error| {
145156
warn!(%error, "Failed to encode error header");
146157
HeaderValue::from_static("Unexpected error")
147158
}),
148159
);
149160
}
150-
t
161+
162+
Ok(t)
163+
}
164+
}
165+
166+
impl<R, B> ResponseBody<R, B> {
167+
/// Marks this body as "rescued".
168+
///
169+
/// No more data frames will be yielded, and the given trailers will be returned when this
170+
/// body is polled.
171+
fn set_rescued(mut self: Pin<&mut Self>, trailers: HeaderMap) {
172+
let trailers = Some(trailers);
173+
let new = Self(Inner::Rescued { trailers });
174+
self.set(new);
151175
}
152176
}
153177

0 commit comments

Comments
 (0)