diff --git a/source/connection_manager.c b/source/connection_manager.c index f8850f7f8..06007005c 100644 --- a/source/connection_manager.c +++ b/source/connection_manager.c @@ -1164,7 +1164,6 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect struct aws_http_connection_manager *manager = work->manager; - int representative_error = 0; size_t new_connection_failures = 0; /* @@ -1202,8 +1201,6 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect */ struct aws_array_list errors; AWS_ZERO_STRUCT(errors); - /* Even if we can't init this array, we still need to invoke error callbacks properly */ - bool push_errors = false; if (work->new_connections > 0) { AWS_LOGF_INFO( @@ -1211,20 +1208,21 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect "id=%p: Requesting %zu new connections from http", (void *)manager, work->new_connections); - push_errors = aws_array_list_init_dynamic(&errors, work->allocator, work->new_connections, sizeof(int)) == - AWS_ERROR_SUCCESS; + AWS_FATAL_ASSERT( + aws_array_list_init_dynamic(&errors, work->allocator, work->new_connections, sizeof(int)) == + AWS_OP_SUCCESS); } for (size_t i = 0; i < work->new_connections; ++i) { if (s_aws_http_connection_manager_new_connection(manager)) { ++new_connection_failures; - representative_error = aws_last_error(); - if (push_errors) { - AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &representative_error) == AWS_OP_SUCCESS); - } + int error = aws_last_error(); + AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &error) == AWS_OP_SUCCESS); } } + bool has_pending_acquisitions = false; + struct aws_connection_management_transaction pending_acquisitions_work; if (new_connection_failures > 0) { /* * We failed and aren't going to receive a callback, but the current state assumes we will receive @@ -1235,30 +1233,24 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect AWS_FATAL_ASSERT(manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] >= new_connection_failures); s_connection_manager_internal_ref_decrease(manager, AWS_HCMCT_PENDING_CONNECTIONS, new_connection_failures); - /* - * Rather than failing one acquisition for each connection failure, if there's at least one - * connection failure, we instead fail all excess acquisitions, since there's no pending - * connect that will necessarily resolve them. - * - * Try to correspond an error with the acquisition failure, but as a fallback just use the - * representative error. - */ - size_t i = 0; - while (manager->pending_acquisition_count > manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS]) { - int error = representative_error; - if (i < aws_array_list_length(&errors)) { - aws_array_list_get_at(&errors, &error, i); - } - + for (size_t i = 0; i < new_connection_failures && manager->pending_acquisition_count > 0; i++) { + int error; + aws_array_list_get_at(&errors, &error, i); AWS_LOGF_DEBUG( AWS_LS_HTTP_CONNECTION_MANAGER, - "id=%p: Failing excess connection acquisition with error code %d", + "id=%p: Failing connection acquisition with error code %d", (void *)manager, (int)error); s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error, &work->completions); - ++i; } - + has_pending_acquisitions = + manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] + manager->pending_settings_count < + manager->pending_acquisition_count; + if (has_pending_acquisitions) { + /* If there are pending acquisitions, schedule work again to try acquiring them */ + s_aws_connection_management_transaction_init(&pending_acquisitions_work, manager); + s_aws_http_connection_manager_build_transaction(&pending_acquisitions_work); + } aws_mutex_unlock(&manager->lock); } @@ -1266,13 +1258,20 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect * Step 4 - Perform acquisition callbacks */ s_aws_http_connection_manager_complete_acquisitions(&work->completions, work->allocator); - aws_array_list_clean_up(&errors); /* * Step 5 - Clean up work. Do this here rather than at the end of every caller. Destroy the manager if necessary */ s_aws_connection_management_transaction_clean_up(work); + + /* + * Step 6 - If some connection acquisition requests failed and we still have more pending acquisitions, try to + * acquire them. + */ + if (has_pending_acquisitions) { + s_aws_http_connection_manager_execute_transaction(&pending_acquisitions_work); + } } void aws_http_connection_manager_acquire_connection( @@ -1479,12 +1478,11 @@ static void s_cm_on_connection_ready_or_failed( work->connection_to_release = connection; } } else { - /* fail acquisition as one connection cannot be used any more */ - while (manager->pending_acquisition_count > - manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] + manager->pending_settings_count) { + if (manager->pending_acquisition_count > 0) { + /* fail acquisition as connection acquire failed */ AWS_LOGF_DEBUG( AWS_LS_HTTP_CONNECTION_MANAGER, - "id=%p: Failing excess connection acquisition with error code %d", + "id=%p: Failing connection acquisition with error code %d", (void *)manager, (int)error_code); s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error_code, &work->completions); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index dbb2b7b7f..2b64ebf7f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -529,6 +529,8 @@ add_net_test_case(connection_manager_setup_shutdown) add_net_test_case(connection_manager_acquire_release_mix_synchronous) add_net_test_case(connection_manager_acquisition_timeout) add_net_test_case(connection_manager_connect_callback_failure) +add_net_test_case(test_connection_manager_connect_callback_async_failure) +add_net_test_case(test_connection_manager_connect_callback_async_with_immediate_failure) add_net_test_case(connection_manager_connect_immediate_failure) add_net_test_case(connection_manager_proxy_setup_shutdown) add_net_test_case(connection_manager_idle_culling_single) diff --git a/tests/test_connection_manager.c b/tests/test_connection_manager.c index 65de0b089..9fe82ae45 100644 --- a/tests/test_connection_manager.c +++ b/tests/test_connection_manager.c @@ -819,7 +819,7 @@ AWS_TEST_CASE( connection_manager_max_pending_acquisitions_with_vended_connections, s_test_connection_manager_max_pending_acquisitions_with_vended_connections); -static int s_aws_http_connection_manager_create_connection_sync_mock( +static int s_aws_http_connection_manager_create_connection_validate( const struct aws_http_client_connection_options *options) { struct cm_tester *tester = &s_tester; @@ -830,8 +830,6 @@ static int s_aws_http_connection_manager_create_connection_sync_mock( ASSERT_TRUE(aws_byte_cursor_eq_c_str(&interface_name, options->socket_options->network_interface_name)); } - size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1); - ASSERT_SUCCESS(aws_mutex_lock(&tester->lock)); tester->release_connection_fn = options->on_shutdown; ASSERT_SUCCESS(aws_mutex_unlock(&tester->lock)); @@ -847,7 +845,15 @@ static int s_aws_http_connection_manager_create_connection_sync_mock( ASSERT_UINT_EQUALS(options->proxy_options->connection_type, tester->verify_proxy_options->connection_type); } + return AWS_OP_SUCCESS; +} +static int s_aws_http_connection_manager_create_connection_sync_mock( + const struct aws_http_client_connection_options *options) { + s_aws_http_connection_manager_create_connection_validate(options); + struct cm_tester *tester = &s_tester; + struct mock_connection *connection = NULL; + size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1); if (next_connection_id < aws_array_list_length(&tester->mock_connections)) { aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id); @@ -868,6 +874,63 @@ static int s_aws_http_connection_manager_create_connection_sync_mock( return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN); } +struct connect_task_args { + void *user_data; + struct mock_connection *connection; + aws_http_on_client_connection_setup_fn *on_setup; +}; + +static void s_aws_http_connection_manager_connect_task( + struct aws_task *task, + void *user_data, + enum aws_task_status status) { + (void)status; + struct cm_tester *tester = &s_tester; + + struct connect_task_args *task_args = user_data; + struct mock_connection *connection = task_args->connection; + if (connection) { + if (connection->result == AWS_NCRT_SUCCESS) { + task_args->on_setup((struct aws_http_connection *)connection, AWS_ERROR_SUCCESS, task_args->user_data); + } else if (connection->result == AWS_NCRT_ERROR_VIA_CALLBACK) { + task_args->on_setup(NULL, AWS_ERROR_HTTP_UNKNOWN, task_args->user_data); + } else { + AWS_FATAL_ASSERT(0 && "Unexpected connection->result"); + } + } + + aws_mem_release(tester->allocator, task); + aws_mem_release(tester->allocator, task_args); +} + +static int s_aws_http_connection_manager_connect_async_mock(const struct aws_http_client_connection_options *options) { + s_aws_http_connection_manager_create_connection_validate(options); + struct cm_tester *tester = &s_tester; + + struct mock_connection *connection = NULL; + size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1); + if (next_connection_id < aws_array_list_length(&tester->mock_connections)) { + aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id); + } + + if (connection->result == AWS_NCRT_ERROR_FROM_CREATE) { + return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN); + } + + struct aws_task *task = aws_mem_calloc(options->allocator, 1, sizeof(struct aws_task)); + struct connect_task_args *task_args = aws_mem_calloc(options->allocator, 1, sizeof(struct connect_task_args)); + task_args->connection = connection; + task_args->user_data = options->user_data; + task_args->on_setup = options->on_setup; + aws_task_init(task, s_aws_http_connection_manager_connect_task, task_args, "create_connection_task"); + + struct aws_event_loop *event_loop = aws_event_loop_group_get_next_loop(tester->event_loop_group); + uint64_t now; + ASSERT_SUCCESS(aws_event_loop_current_clock_time(event_loop, &now)); + aws_event_loop_schedule_task_future(event_loop, task, now + 1000000000); + return AWS_OP_SUCCESS; +} + static void s_aws_http_connection_manager_release_connection_sync_mock(struct aws_http_connection *connection) { (void)connection; @@ -920,6 +983,17 @@ static struct aws_http_connection_manager_system_vtable s_synchronous_mocks = { .aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock, }; +static struct aws_http_connection_manager_system_vtable s_async_connect_mock = { + .aws_http_client_connect = s_aws_http_connection_manager_connect_async_mock, + .aws_http_connection_release = s_aws_http_connection_manager_release_connection_sync_mock, + .aws_http_connection_close = s_aws_http_connection_manager_close_connection_sync_mock, + .aws_http_connection_new_requests_allowed = s_aws_http_connection_manager_is_connection_available_sync_mock, + .aws_high_res_clock_get_ticks = aws_high_res_clock_get_ticks, + .aws_http_connection_get_channel = s_aws_http_connection_manager_connection_get_channel_sync_mock, + .aws_channel_thread_is_callers_thread = s_aws_http_connection_manager_is_callers_thread_sync_mock, + .aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock, +}; + static int s_test_connection_manager_with_network_interface_list(struct aws_allocator *allocator, void *ctx) { (void)ctx; struct aws_byte_cursor *interface_names_array = aws_mem_calloc(allocator, 3, sizeof(struct aws_byte_cursor)); @@ -1055,6 +1129,68 @@ static int s_test_connection_manager_connect_callback_failure(struct aws_allocat } AWS_TEST_CASE(connection_manager_connect_callback_failure, s_test_connection_manager_connect_callback_failure); +static int s_test_connection_manager_connect_callback_async_failure(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + int error_connections = 5; + int success_connections = 5; + struct cm_tester_options options = { + .allocator = allocator, + .max_connections = error_connections, + .mock_table = &s_async_connect_mock, + }; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false); + s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true); + + s_acquire_connections(error_connections + success_connections); + + ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections)); + + ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections); + ASSERT_SUCCESS(s_release_connections(success_connections, false)); + + ASSERT_SUCCESS(s_cm_tester_clean_up()); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE( + test_connection_manager_connect_callback_async_failure, + s_test_connection_manager_connect_callback_async_failure); + +static int s_test_connection_manager_connect_callback_async_with_immediate_failure( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + int error_connections = 5; + int success_connections = 5; + struct cm_tester_options options = { + .allocator = allocator, + .max_connections = error_connections + success_connections, + .mock_table = &s_async_connect_mock, + }; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true); + s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false); + + s_acquire_connections(error_connections + success_connections); + + ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections)); + + ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections); + ASSERT_SUCCESS(s_release_connections(success_connections, false)); + + ASSERT_SUCCESS(s_cm_tester_clean_up()); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE( + test_connection_manager_connect_callback_async_with_immediate_failure, + s_test_connection_manager_connect_callback_async_with_immediate_failure); + static int s_test_connection_manager_connect_immediate_failure(struct aws_allocator *allocator, void *ctx) { (void)ctx;