Skip to content
Merged
142 changes: 83 additions & 59 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use linkerd_app_core::{
errors::respond::L5D_PROXY_ERROR,
identity, io, metrics,
proxy::http,
svc::{self, NewService, Param},
svc::{self, http::TracingExecutor, NewService, Param},
tls,
transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr},
NameAddr, ProxyRuntime,
Error, NameAddr, ProxyRuntime,
};
use linkerd_app_test::connect::ConnectFuture;
use linkerd_tracing::test::trace_init;
Expand Down Expand Up @@ -47,9 +47,7 @@ where

#[tokio::test(flavor = "current_thread")]
async fn unmeshed_http1_hello_world() {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
let _trace = trace_init();
Expand Down Expand Up @@ -82,15 +80,18 @@ async fn unmeshed_http1_hello_world() {
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, you can use core::Result so this is just .collect::<Result<Vec<()>>>()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i took a swing at this, but i think i'll opt to elide this.

i have a personal aversion to importing Result aliases that overwrite the Result<T, E> included in the standard prelude, and found that the alternate .collect::<linkerd_app_core::Result<Vec<()>>>() form didn't really golf this down much.

.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
async fn downgrade_origin_form() {
// Reproduces https://github.com/linkerd/linkerd2/issues/5298
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
Expand Down Expand Up @@ -126,14 +127,17 @@ async fn downgrade_origin_form() {
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
async fn downgrade_absolute_form() {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
Expand Down Expand Up @@ -169,7 +173,12 @@ async fn downgrade_absolute_form() {
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -211,7 +220,12 @@ async fn http1_bad_gateway_meshed_response_error_header() {
// logical error context is added.
check_error_header(rsp.headers(), "server is not listening");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -252,7 +266,12 @@ async fn http1_bad_gateway_unmeshed_response() {
"response must not contain L5D_PROXY_ERROR header"
);

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -262,9 +281,7 @@ async fn http1_connect_timeout_meshed_response_error_header() {

// Build a mock connect that sleeps longer than the default inbound
// connect timeout.
#[allow(deprecated)] // linkerd/linkerd2#8733
let server = hyper::server::conn::Http::new();
let connect = support::connect().endpoint(Target::addr(), connect_timeout(server));
let connect = support::connect().endpoint(Target::addr(), connect_timeout());

// Build a client using the connect that always sleeps so that responses
// are GATEWAY_TIMEOUT.
Expand Down Expand Up @@ -299,7 +316,12 @@ async fn http1_connect_timeout_meshed_response_error_header() {
// logical error context is added.
check_error_header(rsp.headers(), "connect timed out after 1s");

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -309,9 +331,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {

// Build a mock connect that sleeps longer than the default inbound
// connect timeout.
#[allow(deprecated)] // linkerd/linkerd2#8733
let server = hyper::server::conn::Http::new();
let connect = support::connect().endpoint(Target::addr(), connect_timeout(server));
let connect = support::connect().endpoint(Target::addr(), connect_timeout());

// Build a client using the connect that always sleeps so that responses
// are GATEWAY_TIMEOUT.
Expand Down Expand Up @@ -344,7 +364,12 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {
"response must not contain L5D_PROXY_ERROR header"
);

bg.await.expect("background task failed");
// Wait for all of the background tasks to complete, panicking if any returned an error.
bg.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.expect("background task failed");
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -386,7 +411,7 @@ async fn h2_response_meshed_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -430,7 +455,7 @@ async fn h2_response_unmeshed_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -473,7 +498,7 @@ async fn grpc_meshed_response_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -518,7 +543,7 @@ async fn grpc_unmeshed_response_error_header() {
// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
// to the mock implementation and has no significance to the test.
let _ = bg.await;
let _ = bg.join_all().await;
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -527,9 +552,7 @@ async fn grpc_response_class() {

// Build a mock connector serves a gRPC server that returns errors.
let connect = {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http2_only(true);
let server = hyper::server::conn::http2::Builder::new(TracingExecutor);
support::connect().endpoint_fn_boxed(
Target::addr(),
grpc_status_server(server, tonic::Code::Unknown),
Expand Down Expand Up @@ -606,9 +629,8 @@ async fn grpc_response_class() {
}

#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn hello_server(
http: hyper::server::conn::Http,
server: hyper::server::conn::http1::Builder,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
let span = tracing::info_span!("hello_server", ?endpoint);
Expand All @@ -620,7 +642,8 @@ fn hello_server(
Ok::<_, io::Error>(Response::new(Body::from("Hello world!")))
});
tokio::spawn(
http.serve_connection(server_io, hello_svc)
server
.serve_connection(server_io, hello_svc)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
Expand All @@ -630,7 +653,7 @@ fn hello_server(
#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn grpc_status_server(
http: hyper::server::conn::Http,
server: hyper::server::conn::http2::Builder<TracingExecutor>,
status: tonic::Code,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
Expand All @@ -639,26 +662,30 @@ fn grpc_status_server(
tracing::info!("mock connecting");
let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(
http.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert("grpc-status", (status as u32).to_string().parse().unwrap());
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
server
.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert(
"grpc-status",
(status as u32).to_string().parse().unwrap(),
);
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
}
Expand All @@ -675,10 +702,7 @@ fn connect_error() -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
}

#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn connect_timeout(
http: hyper::server::conn::Http,
) -> Box<dyn FnMut(Remote<ServerAddr>) -> ConnectFuture + Send> {
fn connect_timeout() -> Box<dyn FnMut(Remote<ServerAddr>) -> ConnectFuture + Send> {
Box::new(move |endpoint| {
let span = tracing::info_span!("connect_timeout", ?endpoint);
Box::pin(
Expand Down
Loading
Loading