diff --git a/.builder/action/local-server-setup.py b/.builder/action/local-server-setup.py index 26c52df74..fbca7c627 100644 --- a/.builder/action/local-server-setup.py +++ b/.builder/action/local-server-setup.py @@ -30,7 +30,7 @@ def run(self, env): # Okay to fail, and if it fails, you will know when you enable the localhost tests. # We don't need it to succeed on every platform we have. result = self.env.shell.exec(python_path, - '-m', 'pip', 'install', 'h2') + '-m', 'pip', 'install', 'h11', 'h2', 'trio') if result.returncode != 0: print( "Could not install python HTTP/2 server." + @@ -38,11 +38,12 @@ def run(self, env): return base_dir = os.path.dirname(os.path.realpath(__file__)) - dir = os.path.join(base_dir, "..", "..", "tests", "py_localhost") + dir = os.path.join(base_dir, "..", "..", "tests", "mock_server") os.chdir(dir) - p_server = subprocess.Popen([python_path, "server.py"]) - p_non_tls_server = subprocess.Popen([python_path, "non_tls_server.py"]) + p_server = subprocess.Popen([python_path, "h2tls_mock_server.py"]) + p_non_tls_server = subprocess.Popen([python_path, "h2non_tls_server.py"]) + p_h11_server = subprocess.Popen([python_path, "h11mock_server.py"]) @atexit.register def close_local_server(): diff --git a/tests/mock_server/README.md b/tests/mock_server/README.md new file mode 100644 index 000000000..c93f6af19 --- /dev/null +++ b/tests/mock_server/README.md @@ -0,0 +1,128 @@ +# HTTP2 Local server + +Local server based on [python-hyper/h2](https://github.com/python-hyper/h2). + +## How to run the server + +Python 3.5+ required. + +* Install hyper h2 python module. `python3 -m pip install h2` + +### TLS server + +* The code is based the [example](https://github.com/python-hyper/h2/blob/master/examples/asyncio/asyncio-server.py) from hyper h2 server. +* Have the cert/key ready. The script now using `../resources/unittests.crt`, you can either just run the script within this directory, which will find the certificates and key from the related path, or you can use your own and change the code coordinately. +* Run python. `python3 ./server.py`. + +#### Endpoint + +##### `/echo` - Echo endpoint (default) + +Echoes back request headers and body as JSON. + +```bash +curl -k -v -H "foo:bar" https://localhost:3443/echo +``` +#### Special headers + +##### `/echo` with `x-repeat-data` header - Download test + +Sends repeated test pattern of specified size (in bytes). + +```bash +# Download 1MB of repeated data +curl -k -v -H "x-repeat-data: 1000000" https://localhost:3443/echo +``` + +##### `/echo` with `x-repeat-data` + `x-slow-response` headers - Slow connection test + +Sends repeated data throttled to ~900 bytes/sec (for timeout testing). + +```bash +# Download 5MB slowly at default speed (900 bytes/sec) +curl -k -v -H "x-repeat-data: 5000000" -H "x-slow-response: true" https://localhost:3443/echo +``` + +##### `/echo` with custom throughput - Custom speed test + +Override default throughput with `x-throughput-bps` header. + +```bash +# Download 5MB at 500 bytes/sec +curl -k -v -H "x-repeat-data: 5000000" -H "x-slow-response: true" -H "x-throughput-bps: 500" https://localhost:3443/echo +``` + +##### `/echo` with `x-upload-test` header - Upload test + +Returns the byte count of the uploaded body without echoing the body content. + +```bash +# Upload data and get byte count +curl -k -v -X PUT -H "x-upload-test: true" -d "test data" https://localhost:3443/echo +``` + +##### `/echo` with `x-expect-status` header - Custom status code + +Returns the specified HTTP status code. + +```bash +# Get a 500 status code +curl -k -v -H "x-expect-status: 500" https://localhost:3443/echo +``` + +##### Any other path + +Returns 404 Not Found. + +### Non-TLS server + +- The code is based the non-tls [example](http://python-hyper.org/projects/h2/en/stable/basic-usage.html) from hyper h2 server. +- Run python. `python3 ./non_tls_server.py`. +- To test the server runs correctly, you can do `curl -v --http2-prior-knowledge http://localhost:3280` and check the result. + +# HTTP1.1 Local server + +## Requirements + +Install the required Python dependencies: + +```bash +pip install trio h11 +``` + +Or using pip3: + +```bash +pip3 install trio h11 +``` + +## Running the Server + +### Basic Usage (HTTP + HTTPS) + +Run both HTTP (port 80) and HTTPS (port 443) servers: + +```bash +sudo python3 mock_server.py +``` + +Note: `sudo` is required for ports 80 and 443 on most systems. + +### Test Mode (Custom Port) + +Run on a custom port without sudo: + +```bash +TEST_PORT=8080 python3 mock_server.py +``` + +**Important**: Since this uses a self-signed certificate, clients must disable peer verification. + +## Endpoints + +- **Any path**: Echoes request body as JSON +- **/response-headers?HeaderName=value**: Adds custom headers to the response based on query parameters + +## Stopping the Server + +Press `Ctrl+C` to gracefully shut down the server. diff --git a/tests/mock_server/h11mock_server.py b/tests/mock_server/h11mock_server.py new file mode 100644 index 000000000..8ef8a860f --- /dev/null +++ b/tests/mock_server/h11mock_server.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +import json +from itertools import count +import os +from urllib.parse import quote + +import trio +import h11 + +MAX_RECV = 2**16 +TIMEOUT = 10 + + +class TrioHTTPWrapper: + _next_id = count() + + def __init__(self, stream): + self.stream = stream + self.conn = h11.Connection(h11.SERVER) + self._obj_id = next(TrioHTTPWrapper._next_id) + + async def send(self, event): + assert type(event) is not h11.ConnectionClosed + data = self.conn.send(event) + try: + await self.stream.send_all(data) + except BaseException: + self.conn.send_failed() + raise + + async def _read_from_peer(self): + if self.conn.they_are_waiting_for_100_continue: + self.info("Sending 100 Continue") + go_ahead = h11.InformationalResponse( + status_code=100, headers=self.basic_headers() + ) + await self.send(go_ahead) + try: + data = await self.stream.receive_some(MAX_RECV) + # URL-encode non-ASCII characters in the request line + if data and b'\r\n' in data: + request_line_end = data.index(b'\r\n') + request_line = data[:request_line_end] + rest = data[request_line_end:] + + try: + # Try to decode and check if there are non-ASCII chars + request_line_str = request_line.decode('ascii') + except UnicodeDecodeError: + # Has non-ASCII, need to URL-encode + parts = request_line.split(b' ') + if len(parts) == 3: + method, target, version = parts + # URL-encode the target + target_str = target.decode('utf-8', errors='replace') + # Only encode the path part, preserve the structure + if '?' in target_str: + path, query = target_str.split('?', 1) + # Encode each query parameter value + encoded_query_parts = [] + for param in query.split('&'): + if '=' in param: + key, value = param.split('=', 1) + encoded_value = quote(value, safe='') + encoded_query_parts.append(f"{key}={encoded_value}") + else: + encoded_query_parts.append(param) + target_str = path + '?' + '&'.join(encoded_query_parts) + + request_line = b' '.join([method, target_str.encode('ascii'), version]) + data = request_line + rest + except (ConnectionError, trio.BrokenResourceError, trio.ClosedResourceError): + data = b"" + self.conn.receive_data(data) + + async def next_event(self): + while True: + event = self.conn.next_event() + if event is h11.NEED_DATA: + await self._read_from_peer() + continue + return event + + async def shutdown_and_clean_up(self): + try: + if hasattr(self.stream, 'send_eof'): + await self.stream.send_eof() + except (trio.BrokenResourceError, AttributeError): + pass + + with trio.move_on_after(TIMEOUT): + try: + while True: + got = await self.stream.receive_some(MAX_RECV) + if not got: + break + except (trio.BrokenResourceError, trio.ClosedResourceError): + pass + + try: + await self.stream.aclose() + except (trio.BrokenResourceError, trio.ClosedResourceError): + pass + + def basic_headers(self): + return [("Server", "echo-server")] + + def info(self, *args): + print(f"{self._obj_id}:", *args) + + +async def http_serve(stream): + wrapper = TrioHTTPWrapper(stream) + wrapper.info("Got new connection") + while True: + assert wrapper.conn.states == {h11.CLIENT: h11.IDLE, h11.SERVER: h11.IDLE} + + try: + with trio.fail_after(TIMEOUT): + wrapper.info("Server main loop waiting for request") + event = await wrapper.next_event() + wrapper.info("Server main loop got event:", event) + if type(event) is h11.Request: + await send_echo_response(wrapper, event) + except Exception as exc: + wrapper.info(f"Error during response handler: {exc!r}") + await maybe_send_error_response(wrapper, exc) + + if wrapper.conn.our_state is h11.MUST_CLOSE: + wrapper.info("connection is not reusable, so shutting down") + await wrapper.shutdown_and_clean_up() + return + else: + try: + wrapper.info("trying to re-use connection") + wrapper.conn.start_next_cycle() + except h11.ProtocolError: + states = wrapper.conn.states + wrapper.info("unexpected state", states, "-- bailing out") + await maybe_send_error_response( + wrapper, RuntimeError(f"unexpected state {states}") + ) + await wrapper.shutdown_and_clean_up() + return + + +async def send_simple_response(wrapper, status_code, content_type, body): + wrapper.info("Sending", status_code, "response with", len(body), "bytes") + headers = wrapper.basic_headers() + headers.append(("Content-Type", content_type)) + headers.append(("Content-Length", str(len(body)))) + res = h11.Response(status_code=status_code, headers=headers) + await wrapper.send(res) + await wrapper.send(h11.Data(data=body)) + await wrapper.send(h11.EndOfMessage()) + + +async def maybe_send_error_response(wrapper, exc): + wrapper.info("trying to send error response...") + if wrapper.conn.our_state not in {h11.IDLE, h11.SEND_RESPONSE}: + wrapper.info("...but I can't, because our state is", wrapper.conn.our_state) + return + try: + if isinstance(exc, h11.RemoteProtocolError): + status_code = exc.error_status_hint + elif isinstance(exc, trio.TooSlowError): + status_code = 408 + else: + status_code = 500 + body = str(exc).encode("utf-8") + await send_simple_response( + wrapper, status_code, "text/plain; charset=utf-8", body + ) + except Exception as exc: + wrapper.info("error while sending error response:", exc) + + +async def send_echo_response(wrapper, request): + wrapper.info("Preparing echo response") + + body_data = b"" + while True: + event = await wrapper.next_event() + if type(event) is h11.EndOfMessage: + break + assert type(event) is h11.Data + body_data += event.data + + target = request.target if isinstance(request.target, bytes) else request.target.encode() + target_str = target.decode("utf-8", errors="replace") + + # Check if this is the /404 endpoint + if target_str.startswith("/404"): + status_code = 404 + else: + status_code = 200 + + response_json = {"data": body_data.decode("utf-8")} + response_body = json.dumps(response_json, indent=4).encode("utf-8") + + headers = wrapper.basic_headers() + headers.append(("Content-Type", "application/json; charset=utf-8")) + headers.append(("Content-Length", str(len(response_body)))) + + for header_name, header_value in request.headers: + echo_name = b"Echo-" + header_name if isinstance(header_name, bytes) else f"Echo-{header_name}".encode() + echo_value = header_value if isinstance(header_value, bytes) else str(header_value).encode() + headers.append((echo_name, echo_value)) + + res = h11.Response(status_code=status_code, headers=headers) + await wrapper.send(res) + await wrapper.send(h11.Data(data=response_body)) + await wrapper.send(h11.EndOfMessage()) + + +async def serve(port): + print(f"listening on http://localhost:{port}") + try: + await trio.serve_tcp(http_serve, port) + except KeyboardInterrupt: + print("KeyboardInterrupt - shutting down") + + +async def serve_ssl(port, cert_file=os.path.join(os.path.dirname(__file__), + "../resources/unittests.crt"), key_file=os.path.join(os.path.dirname(__file__), + "../resources/unittests.key")): + import ssl + + script_dir = os.path.dirname(os.path.abspath(__file__)) + cert_path = os.path.join(script_dir, cert_file) + key_path = os.path.join(script_dir, key_file) + + if not os.path.exists(cert_path) or not os.path.exists(key_path): + print(f"Warning: SSL certificates not found at {cert_path} and {key_path}") + print(f"Skipping HTTPS server on port {port}") + print(f"To enable HTTPS, run: openssl req -x509 -newkey rsa:2048 -keyout {key_file} -out {cert_file} -days 365 -nodes -subj '/C=US/ST=WA/L=Seattle/O=Test/CN=localhost'") + return + + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(cert_path, key_path) + + print(f"listening on https://localhost:{port}") + try: + await trio.serve_ssl_over_tcp(http_serve, port, ssl_context) + except KeyboardInterrupt: + print("KeyboardInterrupt - shutting down") + + +async def main(): + test_port = os.environ.get('TEST_PORT') + + if test_port: + print(f"Running in test mode on port {test_port}") + await serve(int(test_port)) + else: + async with trio.open_nursery() as nursery: + nursery.start_soon(serve, 80) + nursery.start_soon(serve_ssl, 443) + + +if __name__ == "__main__": + trio.run(main) diff --git a/tests/py_localhost/non_tls_server.py b/tests/mock_server/h2non_tls_server.py similarity index 100% rename from tests/py_localhost/non_tls_server.py rename to tests/mock_server/h2non_tls_server.py diff --git a/tests/py_localhost/server.py b/tests/mock_server/h2tls_mock_server.py similarity index 72% rename from tests/py_localhost/server.py rename to tests/mock_server/h2tls_mock_server.py index 8c578a814..339bfc1ef 100644 --- a/tests/py_localhost/server.py +++ b/tests/mock_server/h2tls_mock_server.py @@ -8,9 +8,22 @@ A fully-functional HTTP/2 server using asyncio. Requires Python 3.5+. -This example demonstrates handling requests with bodies, as well as handling -those without. In particular, it demonstrates the fact that DataReceived may -be called multiple times, and that applications must handle that possibility. +Supported endpoints: + +1. GET /echo (default) + - Echoes back request headers and body as JSON + +2. GET /echo with x-repeat-data header + - x-repeat-data: - Sends repeated test pattern of specified size + - Example: x-repeat-data: 1000000 (sends 1MB) + +3. GET /echo with x-repeat-data + x-slow-response headers + - x-slow-response: true - Throttles response to ~900 bytes/sec (for timeout testing) + - x-throughput-bps: - Optional: Override throughput (default 900) + - Example: x-repeat-data: 5000000, x-slow-response: true, x-throughput-bps: 500 + +4. Any other path + - Returns 404 Not Found """ import asyncio import io @@ -94,28 +107,79 @@ def request_received(self, headers: List[Tuple[str, str]], stream_id: int): int(2147483647/2), stream_id) self.raw_headers = headers headers = collections.OrderedDict(headers) - path = headers[':path'] - method = headers[':method'] - if method == "PUT" or method == "POST": - self.file_path = os.path.join(os.path.curdir, path[1:]) - if os.path.exists(self.file_path): - os.remove(self.file_path) # Store off the request data. request_data = RequestData(headers, io.BytesIO()) self.stream_data[stream_id] = request_data def handle_request_echo(self, stream_id: int, request_data: RequestData): + """ + Handle /echo endpoint with optional special headers: + - x-repeat-data: - Triggers send_repeat_data() with specified length + - x-slow-response: true - Triggers send_slow_repeat_data() instead (requires x-repeat-data) + - x-throughput-bps: - Override throughput for slow response (default 900) + + Without special headers, echoes request headers and body as JSON. + """ + headers_dict = dict(self.raw_headers) + + expect_status = headers_dict.get('x-expect-status') + if expect_status: + response_headers = [(':status', expect_status)] + self.conn.send_headers(stream_id, response_headers, end_stream=True) + return + + # Check for x-repeat-data header + repeat_data_header = headers_dict.get('x-repeat-data') + if repeat_data_header: + try: + length = int(repeat_data_header) + response_headers = [(':status', '200')] + self.conn.send_headers(stream_id, response_headers, end_stream=False) + + # Check for slow response + if headers_dict.get('x-slow-response') == 'true': + # Check for custom throughput + throughput_header = headers_dict.get('x-throughput-bps') + if throughput_header: + self.out_bytes_per_second = int(throughput_header) + asyncio.ensure_future(self.send_slow_repeat_data(length, stream_id)) + else: + asyncio.ensure_future(self.send_repeat_data(length, stream_id)) + return + except ValueError: + pass # Fall through to echo behavior + + # Check for upload test (don't echo body, just return byte count) + if headers_dict.get('x-upload-test') == 'true': + body_bytes = request_data.data.getvalue() + data = json.dumps({"bytes": len(body_bytes)}, indent=4).encode("utf8") + response_headers = [(':status', '200'), ('content-length', str(len(data)))] + self.conn.send_headers(stream_id, response_headers, end_stream=False) + asyncio.ensure_future(self.send_data(data, stream_id)) + return + + # Default echo behavior response_headers = [(':status', '200')] + # Filter out headers that shouldn't be echoed back + skip_headers = {'content-length', 'content-encoding', 'transfer-encoding'} for i in self.raw_headers: - # Response headers back and exclude pseudo headers - if i[0][0] != ':': + # Response headers back and exclude pseudo headers and problematic headers + if i[0][0] != ':' and i[0].lower() not in skip_headers: response_headers.append(i) - body = request_data.data.getvalue().decode('utf-8') + + body_bytes = request_data.data.getvalue() + + body = body_bytes.decode('utf-8') + data = json.dumps( - {"body": body}, indent=4 + {"body": body, "bytes": len(body_bytes)}, indent=4, ).encode("utf8") - self.conn.send_headers(stream_id, response_headers) + + # Add correct content-length for our response + response_headers.append(('content-length', str(len(data)))) + + self.conn.send_headers(stream_id, response_headers, end_stream=False) asyncio.ensure_future(self.send_data(data, stream_id)) def stream_complete(self, stream_id: int): @@ -129,29 +193,10 @@ def stream_complete(self, stream_id: int): return path = request_data.headers[':path'] - method = request_data.headers[':method'] - if path == '/expect500': - self.conn.send_headers(stream_id, [(':status', '500')]) - asyncio.ensure_future(self.send_data(b"Internal Server Error", stream_id)) - elif method == "PUT" or method == "POST": - self.conn.send_headers(stream_id, [(':status', '200')]) - asyncio.ensure_future(self.send_data( - str(self.num_sentence_received[stream_id]).encode(), stream_id)) - elif path == '/echo': + if path == '/echo': self.handle_request_echo(stream_id, request_data) - elif path == '/downloadTest': - length = self.download_test_length - self.conn.send_headers( - stream_id, [(':status', '200'), ('content-length', str(length))]) - asyncio.ensure_future(self.send_repeat_data(length, stream_id)) - elif path == '/slowConnTest': - length = int(self.download_test_length/1000) - self.conn.send_headers( - stream_id, [(':status', '200'), ('content-length', str(length))]) - asyncio.ensure_future( - self.send_slow_repeat_data(length, stream_id)) else: - self.conn.send_headers(stream_id, [(':status', '404')]) + self.conn.send_headers(stream_id, [(':status', '404')], end_stream=False) asyncio.ensure_future(self.send_data(b"Not Found", stream_id)) def receive_data(self, data: bytes, stream_id: int, flow_controlled_length: int): @@ -173,16 +218,7 @@ def receive_data(self, data: bytes, stream_id: int, flow_controlled_length: int) stream_id, error_code=ErrorCodes.PROTOCOL_ERROR ) else: - method = stream_data.headers[':method'] - if method == "PUT" or method == "POST": - if stream_id in self.num_sentence_received: - self.num_sentence_received[stream_id] = self.num_sentence_received[stream_id] + \ - len(data) - else: - self.num_sentence_received[stream_id] = len(data) - - else: - stream_data.data.write(data) + stream_data.data.write(data) def stream_reset(self, stream_id): """ @@ -225,7 +261,9 @@ async def send_data(self, data, stream_id): async def send_repeat_data(self, length, stream_id): """ - Send data with length according to the flow control rules. + Send repeated test pattern data of specified length. + Respects HTTP/2 flow control rules. + Triggered by x-repeat-data header on /echo endpoint. """ while length > 0: while self.conn.local_flow_control_window(stream_id) < 1: @@ -259,7 +297,9 @@ async def send_repeat_data(self, length, stream_id): async def send_slow_repeat_data(self, length, stream_id): """ - Send data with length slowly (less than 1000 bytes per second) + Send repeated test pattern data slowly (throttled to out_bytes_per_second, default 900). + Used for timeout and slow connection testing. + Triggered by x-repeat-data + x-slow-response headers on /echo endpoint. """ while length > 0: while self.conn.local_flow_control_window(stream_id) < 1: diff --git a/tests/py_localhost/README.md b/tests/py_localhost/README.md deleted file mode 100644 index 7a4b37940..000000000 --- a/tests/py_localhost/README.md +++ /dev/null @@ -1,44 +0,0 @@ -# Local server - -Local server based on [python-hyper/h2](https://github.com/python-hyper/h2). - -## How to run the server - -Python 3.5+ required. - -* Install hyper h2 python module. `python3 -m pip install h2` - -### TLS server - -* The code is based the [example](https://github.com/python-hyper/h2/blob/master/examples/asyncio/asyncio-server.py) from hyper h2 server. -* Have the cert/key ready. The script now using `../resources/unittests.crt`, you can either just run the script within this directory, which will find the certificates and key from the related path, or you can use your own and change the code coordinately. -* Run python. `python3 ./server.py`. - -#### Echo - -* Minor changed based on the example to response the headers of requests back within the headers from `/echo`. -* To test the server runs correctly, you can do `curl -k -v -H "foo:bar" https://localhost:3443/echo` and check the result. - -#### Download test - -* To test download, when `:path` is `/downloadTest`, server will response a repeated string with length `self.download_test_length`, which is 2,500,000,000 now. It will be repeats of sting "This is CRT HTTP test." -* To test the server runs correctly, you can do `curl -k -v -H "foo:bar" https://localhost:3443/downloadTest` and check the result. - -#### Slow Connection Test - -* Simulate a slow connection when `:path` is `/slowConnTest`. The speed is controlled by `out_bytes_per_second`. Default speed is 900 B/s, which will send 900 bytes of data and wait a sec to send new 900 bytes of data. - -#### Upload test - -* To test upload, when `:method` is `POST` or `PUT`, server will response the length received from response body -* To test the server runs correctly, you can do `curl -k -X POST -F'data=@upload_test.txt' https://localhost:3443/upload_test` where `upload_test.txt` is file to upload. - -#### expect500 - -* The server will always return `500` for `:status`, when the `:path` is `/expect500` - -### Non-TLS server - -* The code is based the non-tls [example](http://python-hyper.org/projects/h2/en/stable/basic-usage.html) from hyper h2 server. -* Run python. `python3 ./non_tls_server.py`. -* To test the server runs correctly, you can do `curl -v --http2-prior-knowledge http://localhost:3280` and check the result. diff --git a/tests/test_localhost_integ.c b/tests/test_localhost_integ.c index b47bd524c..3c6616853 100644 --- a/tests/test_localhost_integ.c +++ b/tests/test_localhost_integ.c @@ -394,9 +394,16 @@ static int s_tester_on_put_body(struct aws_http_stream *stream, const struct aws (void)stream; (void)user_data; - struct aws_string *content_length_header_str = aws_string_new_from_cursor(s_tester.alloc, data); - s_tester.num_sen_received = (uint64_t)strtoull((const char *)content_length_header_str->bytes, NULL, 10); - aws_string_destroy(content_length_header_str); + /* Response is JSON as string: "{\n \"bytes\": 2500000000\n}" - extract the number */ + struct aws_byte_cursor bytes_key = aws_byte_cursor_from_c_str("\"bytes\": "); + struct aws_byte_cursor found; + + if (aws_byte_cursor_find_exact(data, &bytes_key, &found) == AWS_OP_SUCCESS) { + struct aws_byte_cursor value_cursor = *data; + value_cursor.ptr = found.ptr + bytes_key.len; + value_cursor.len = data->len - (size_t)(found.ptr - data->ptr) - bytes_key.len - 2; + aws_byte_cursor_utf8_parse_u64(value_cursor, &s_tester.num_sen_received); + } return AWS_OP_SUCCESS; } @@ -425,7 +432,7 @@ static int s_localhost_integ_h2_upload_stress(struct aws_allocator *allocator, v struct aws_http_header request_headers_src[] = { DEFINE_HEADER(":method", "PUT"), DEFINE_HEADER(":scheme", "https"), - DEFINE_HEADER(":path", "/upload_test.txt"), + DEFINE_HEADER(":path", "/echo"), { .name = aws_byte_cursor_from_c_str(":authority"), .value = host_name, @@ -434,6 +441,7 @@ static int s_localhost_integ_h2_upload_stress(struct aws_allocator *allocator, v .name = aws_byte_cursor_from_c_str("content_length"), .value = aws_byte_cursor_from_c_str(content_length_sprintf_buffer), }, + DEFINE_HEADER("x-upload-test", "true"), }; struct aws_http_message *request = aws_http2_message_new_request(allocator); aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); @@ -491,14 +499,21 @@ static int s_localhost_integ_h2_download_stress(struct aws_allocator *allocator, /* wait for connection connected */ ASSERT_SUCCESS(s_wait_on_connection_connected(&s_tester)); + char length_sprintf_buffer[128] = ""; + snprintf(length_sprintf_buffer, sizeof(length_sprintf_buffer), "%zu", length); + struct aws_http_header request_headers_src[] = { DEFINE_HEADER(":method", "GET"), DEFINE_HEADER(":scheme", "https"), - DEFINE_HEADER(":path", "/downloadTest"), + DEFINE_HEADER(":path", "/echo"), { .name = aws_byte_cursor_from_c_str(":authority"), .value = host_name, }, + { + .name = aws_byte_cursor_from_c_str("x-repeat-data"), + .value = aws_byte_cursor_from_c_str(length_sprintf_buffer), + }, }; struct aws_http_message *request = aws_http2_message_new_request(allocator); ASSERT_NOT_NULL(request); diff --git a/tests/test_stream_manager.c b/tests/test_stream_manager.c index ff992a17f..bda16b9bb 100644 --- a/tests/test_stream_manager.c +++ b/tests/test_stream_manager.c @@ -1269,7 +1269,7 @@ TEST_CASE(h2_sm_closing_before_connection_acquired) { TEST_CASE(localhost_integ_h2_sm_close_connection_on_server_error) { (void)ctx; /* server that will return 500 status code all the time. */ - struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:3443/expect500"); + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:3443/echo"); struct sm_tester_options options = { .max_connections = 1, .max_concurrent_streams_per_connection = 10, @@ -1278,11 +1278,42 @@ TEST_CASE(localhost_integ_h2_sm_close_connection_on_server_error) { .close_connection_on_server_error = true, }; ASSERT_SUCCESS(s_tester_init(&options)); + + struct aws_http_message *request = aws_http2_message_new_request(s_tester.allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + { + .name = aws_byte_cursor_from_c_str(":scheme"), + .value = *aws_uri_scheme(&s_tester.endpoint), + }, + { + .name = aws_byte_cursor_from_c_str(":path"), + .value = s_normalize_path(*aws_uri_path(&s_tester.endpoint)), + }, + { + .name = aws_byte_cursor_from_c_str(":authority"), + .value = *aws_uri_host_name(&s_tester.endpoint), + }, + DEFINE_HEADER("x-expect-status", "500"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .user_data = &s_tester, + .on_complete = s_sm_tester_on_stream_complete, + .on_destroy = s_sm_tester_on_stream_destroy, + }; int num_to_acquire = 50; - ASSERT_SUCCESS(s_sm_stream_acquiring(num_to_acquire)); + ASSERT_SUCCESS(s_sm_stream_acquiring_customize_request(num_to_acquire, &request_options)); + aws_http_message_release(request); + ASSERT_SUCCESS(s_wait_on_streams_completed_count(num_to_acquire)); - ASSERT_TRUE((int)s_tester.acquiring_stream_errors == 0); - ASSERT_TRUE((int)s_tester.stream_200_count == 0); + ASSERT_INT_EQUALS(0, (int)s_tester.acquiring_stream_errors); + ASSERT_INT_EQUALS(0, (int)s_tester.stream_200_count); + ASSERT_INT_EQUALS(num_to_acquire, s_tester.stream_status_not_200_count); return s_tester_clean_up(); } @@ -1339,10 +1370,13 @@ TEST_CASE(localhost_integ_h2_sm_acquire_stream_stress) { static int s_tester_on_put_body(struct aws_http_stream *stream, const struct aws_byte_cursor *data, void *user_data) { (void)user_data; (void)stream; - struct aws_string *content_length_header_str = aws_string_new_from_cursor(s_tester.allocator, data); - size_t num_received = (uint32_t)atoi((const char *)content_length_header_str->bytes); + /* Response is JSON: {"bytes": 2000} - extract the number */ + const char *bytes_key = "\"bytes\": "; + const char *json_str = (const char *)data->ptr; + const char *bytes_pos = strstr(json_str, bytes_key); + AWS_FATAL_ASSERT(bytes_pos != NULL); + size_t num_received = (size_t)atoll(bytes_pos + strlen(bytes_key)); AWS_FATAL_ASSERT(s_tester.length_sent == num_received); - aws_string_destroy(content_length_header_str); return AWS_OP_SUCCESS; } @@ -1369,6 +1403,7 @@ static int s_sm_stream_acquiring_with_body(int num_streams) { .name = aws_byte_cursor_from_c_str("content_length"), .value = aws_byte_cursor_from_c_str(content_length_sprintf_buffer), }, + DEFINE_HEADER("x-upload-test", "true"), }; for (int i = 0; i < num_streams; ++i) { /* TODO: Test the callback will always be fired asynced, as now the CM cannot ensure the callback happens @@ -1400,7 +1435,7 @@ static int s_sm_stream_acquiring_with_body(int num_streams) { /* Test that makes tons of real streams with body against local host */ TEST_CASE(localhost_integ_h2_sm_acquire_stream_stress_with_body) { (void)ctx; - struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:3443/upload_test"); + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:3443/echo"); enum aws_log_level log_level = AWS_LOG_LEVEL_DEBUG; struct sm_tester_options options = { .max_connections = 100, @@ -1425,7 +1460,7 @@ TEST_CASE(localhost_integ_h2_sm_acquire_stream_stress_with_body) { /* Test that connection monitor works properly with HTTP/2 stream manager */ TEST_CASE(localhost_integ_h2_sm_connection_monitor_kill_slow_connection) { (void)ctx; - struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:3443/slowConnTest"); + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:3443/echo"); struct aws_http_connection_monitoring_options monitor_opt = { .allowable_throughput_failure_interval_seconds = 1, .minimum_throughput_bytes_per_second = 1000, @@ -1439,7 +1474,42 @@ TEST_CASE(localhost_integ_h2_sm_connection_monitor_kill_slow_connection) { }; ASSERT_SUCCESS(s_tester_init(&options)); - ASSERT_SUCCESS(s_sm_stream_acquiring(1)); + struct aws_http_message *request = aws_http2_message_new_request(s_tester.allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + { + .name = aws_byte_cursor_from_c_str(":scheme"), + .value = *aws_uri_scheme(&s_tester.endpoint), + }, + { + .name = aws_byte_cursor_from_c_str(":path"), + .value = s_normalize_path(*aws_uri_path(&s_tester.endpoint)), + }, + { + .name = aws_byte_cursor_from_c_str(":authority"), + .value = *aws_uri_host_name(&s_tester.endpoint), + }, + DEFINE_HEADER("x-repeat-data", "5000000"), + DEFINE_HEADER("x-slow-response", "true"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .user_data = &s_tester, + .on_complete = s_sm_tester_on_stream_complete, + .on_destroy = s_sm_tester_on_stream_destroy, + }; + struct aws_http2_stream_manager_acquire_stream_options acquire_stream_option = { + .options = &request_options, + .callback = s_sm_tester_on_stream_acquired, + .user_data = &s_tester, + }; + aws_http2_stream_manager_acquire_stream(s_tester.stream_manager, &acquire_stream_option); + aws_http_message_release(request); + ASSERT_SUCCESS(s_wait_on_streams_completed_count(1)); /* Check the connection closed by connection monitor and the stream should completed with corresponding error */ ASSERT_UINT_EQUALS(s_tester.stream_completed_error_code, AWS_ERROR_HTTP_CONNECTION_CLOSED);