diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 77cc109ff6..e52d75bfaa 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -12,10 +12,10 @@ use linkerd_app_core::{ errors::respond::L5D_PROXY_ERROR, identity, io, metrics, proxy::http, - svc::{self, NewService, Param}, + svc::{self, http::TracingExecutor, NewService, Param}, tls, transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr}, - NameAddr, ProxyRuntime, + Error, NameAddr, ProxyRuntime, }; use linkerd_app_test::connect::ConnectFuture; use linkerd_tracing::test::trace_init; @@ -47,9 +47,7 @@ where #[tokio::test(flavor = "current_thread")] async fn unmeshed_http1_hello_world() { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); + let server = hyper::server::conn::http1::Builder::new(); #[allow(deprecated)] // linkerd/linkerd2#8733 let mut client = hyper::client::conn::Builder::new(); let _trace = trace_init(); @@ -82,15 +80,18 @@ async fn unmeshed_http1_hello_world() { let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] async fn downgrade_origin_form() { // Reproduces https://github.com/linkerd/linkerd2/issues/5298 - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); + let server = hyper::server::conn::http1::Builder::new(); #[allow(deprecated)] // linkerd/linkerd2#8733 let mut client = hyper::client::conn::Builder::new(); client.http2_only(true); @@ -126,14 +127,17 @@ async fn downgrade_origin_form() { let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] async fn downgrade_absolute_form() { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); + let server = hyper::server::conn::http1::Builder::new(); #[allow(deprecated)] // linkerd/linkerd2#8733 let mut client = hyper::client::conn::Builder::new(); client.http2_only(true); @@ -169,7 +173,12 @@ async fn downgrade_absolute_form() { let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -211,7 +220,12 @@ async fn http1_bad_gateway_meshed_response_error_header() { // logical error context is added. check_error_header(rsp.headers(), "server is not listening"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -252,7 +266,12 @@ async fn http1_bad_gateway_unmeshed_response() { "response must not contain L5D_PROXY_ERROR header" ); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -262,9 +281,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { // Build a mock connect that sleeps longer than the default inbound // connect timeout. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let server = hyper::server::conn::Http::new(); - let connect = support::connect().endpoint(Target::addr(), connect_timeout(server)); + let connect = support::connect().endpoint(Target::addr(), connect_timeout()); // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. @@ -299,7 +316,12 @@ async fn http1_connect_timeout_meshed_response_error_header() { // logical error context is added. check_error_header(rsp.headers(), "connect timed out after 1s"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -309,9 +331,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { // Build a mock connect that sleeps longer than the default inbound // connect timeout. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let server = hyper::server::conn::Http::new(); - let connect = support::connect().endpoint(Target::addr(), connect_timeout(server)); + let connect = support::connect().endpoint(Target::addr(), connect_timeout()); // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. @@ -344,7 +364,12 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { "response must not contain L5D_PROXY_ERROR header" ); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -386,7 +411,7 @@ async fn h2_response_meshed_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] @@ -430,7 +455,7 @@ async fn h2_response_unmeshed_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] @@ -473,7 +498,7 @@ async fn grpc_meshed_response_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] @@ -518,7 +543,7 @@ async fn grpc_unmeshed_response_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] @@ -527,9 +552,7 @@ async fn grpc_response_class() { // Build a mock connector serves a gRPC server that returns errors. let connect = { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http2_only(true); + let server = hyper::server::conn::http2::Builder::new(TracingExecutor); support::connect().endpoint_fn_boxed( Target::addr(), grpc_status_server(server, tonic::Code::Unknown), @@ -606,9 +629,8 @@ async fn grpc_response_class() { } #[tracing::instrument] -#[allow(deprecated)] // linkerd/linkerd2#8733 fn hello_server( - http: hyper::server::conn::Http, + server: hyper::server::conn::http1::Builder, ) -> impl Fn(Remote) -> io::Result { move |endpoint| { let span = tracing::info_span!("hello_server", ?endpoint); @@ -620,7 +642,8 @@ fn hello_server( Ok::<_, io::Error>(Response::new(Body::from("Hello world!"))) }); tokio::spawn( - http.serve_connection(server_io, hello_svc) + server + .serve_connection(server_io, hello_svc) .in_current_span(), ); Ok(io::BoxedIo::new(client_io)) @@ -630,7 +653,7 @@ fn hello_server( #[tracing::instrument] #[allow(deprecated)] // linkerd/linkerd2#8733 fn grpc_status_server( - http: hyper::server::conn::Http, + server: hyper::server::conn::http2::Builder, status: tonic::Code, ) -> impl Fn(Remote) -> io::Result { move |endpoint| { @@ -639,26 +662,30 @@ fn grpc_status_server( tracing::info!("mock connecting"); let (client_io, server_io) = support::io::duplex(4096); tokio::spawn( - http.serve_connection( - server_io, - hyper::service::service_fn(move |request: Request| async move { - tracing::info!(?request); - let (mut tx, rx) = Body::channel(); - tokio::spawn(async move { - let mut trls = ::http::HeaderMap::new(); - trls.insert("grpc-status", (status as u32).to_string().parse().unwrap()); - tx.send_trailers(trls).await - }); - Ok::<_, io::Error>( - http::Response::builder() - .version(::http::Version::HTTP_2) - .header("content-type", "application/grpc") - .body(rx) - .unwrap(), - ) - }), - ) - .in_current_span(), + server + .serve_connection( + server_io, + hyper::service::service_fn(move |request: Request| async move { + tracing::info!(?request); + let (mut tx, rx) = Body::channel(); + tokio::spawn(async move { + let mut trls = ::http::HeaderMap::new(); + trls.insert( + "grpc-status", + (status as u32).to_string().parse().unwrap(), + ); + tx.send_trailers(trls).await + }); + Ok::<_, io::Error>( + http::Response::builder() + .version(::http::Version::HTTP_2) + .header("content-type", "application/grpc") + .body(rx) + .unwrap(), + ) + }), + ) + .in_current_span(), ); Ok(io::BoxedIo::new(client_io)) } @@ -675,10 +702,7 @@ fn connect_error() -> impl Fn(Remote) -> io::Result { } #[tracing::instrument] -#[allow(deprecated)] // linkerd/linkerd2#8733 -fn connect_timeout( - http: hyper::server::conn::Http, -) -> Box) -> ConnectFuture + Send> { +fn connect_timeout() -> Box) -> ConnectFuture + Send> { Box::new(move |endpoint| { let span = tracing::info_span!("connect_timeout", ?endpoint); Box::pin( diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 1519a0bdfc..91bb4f53d5 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -2,11 +2,9 @@ use crate::{ app_core::{svc, Error}, io, ContextError, }; -use futures::FutureExt; use hyper::{body::HttpBody, Body}; -use std::future::Future; -use tokio::task::JoinHandle; -use tower::{util::ServiceExt, Service}; +use tokio::task::JoinSet; +use tower::ServiceExt; use tracing::Instrument; #[allow(deprecated)] // linkerd/linkerd2#8733 @@ -14,62 +12,46 @@ use hyper::client::conn::{Builder as ClientBuilder, SendRequest}; type BoxServer = svc::BoxTcp; -async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle>) { - let (client_io, server_io) = io::duplex(4096); - let f = server - .ready() - .await - .expect("proxy server failed to become ready") - .call(server_io); - - let proxy = async move { - let res = f.await.map_err(Into::into); - drop(server); - tracing::debug!("dropped server"); - tracing::info!(?res, "proxy serve task complete"); - res.map(|_| ()) - } - .instrument(tracing::info_span!("proxy")); - (client_io, tokio::spawn(proxy)) -} - +/// Connects a client and server, running a proxy between them. +/// +/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and +/// await a response, and (2) a [`JoinSet`] running background tasks. #[allow(deprecated)] // linkerd/linkerd2#8733 -async fn connect_client( +pub async fn connect_and_accept( client_settings: &mut ClientBuilder, - io: io::DuplexStream, -) -> (SendRequest, JoinHandle>) { + server: BoxServer, +) -> (SendRequest, JoinSet>) { + tracing::info!(settings = ?client_settings, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + let (client, conn) = client_settings - .handshake(io) + .handshake(client_io) .await .expect("Client must connect"); - let client_bg = conn - .map(|res| { - tracing::info!(?res, "Client background complete"); - res.map_err(Into::into) - }) - .instrument(tracing::info_span!("client_bg")); - (client, tokio::spawn(client_bg)) -} -#[allow(deprecated)] // linkerd/linkerd2#8733 -pub async fn connect_and_accept( - client_settings: &mut ClientBuilder, - server: BoxServer, -) -> (SendRequest, impl Future>) { - tracing::info!(settings = ?client_settings, "connecting client with"); - let (client_io, proxy) = run_proxy(server).await; - let (client, client_bg) = connect_client(client_settings, client_io).await; - let bg = async move { - proxy - .await - .expect("proxy background task panicked") - .map_err(ContextError::ctx("proxy background task failed"))?; - client_bg - .await - .expect("client background task panicked") - .map_err(ContextError::ctx("client background task failed"))?; - Ok(()) - }; + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server + .oneshot(server_io) + .await + .map_err(ContextError::ctx("proxy background task failed"))?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await + .map_err(ContextError::ctx("client background task failed")) + .map_err(Error::from)?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + (client, bg) }