Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 30 additions & 32 deletions source/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,6 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect

struct aws_http_connection_manager *manager = work->manager;

int representative_error = 0;
size_t new_connection_failures = 0;

/*
Expand Down Expand Up @@ -1202,29 +1201,28 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
*/
struct aws_array_list errors;
AWS_ZERO_STRUCT(errors);
/* Even if we can't init this array, we still need to invoke error callbacks properly */
bool push_errors = false;

if (work->new_connections > 0) {
AWS_LOGF_INFO(
AWS_LS_HTTP_CONNECTION_MANAGER,
"id=%p: Requesting %zu new connections from http",
(void *)manager,
work->new_connections);
push_errors = aws_array_list_init_dynamic(&errors, work->allocator, work->new_connections, sizeof(int)) ==
AWS_ERROR_SUCCESS;
AWS_FATAL_ASSERT(
aws_array_list_init_dynamic(&errors, work->allocator, work->new_connections, sizeof(int)) ==
AWS_OP_SUCCESS);
}

for (size_t i = 0; i < work->new_connections; ++i) {
if (s_aws_http_connection_manager_new_connection(manager)) {
++new_connection_failures;
representative_error = aws_last_error();
if (push_errors) {
AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &representative_error) == AWS_OP_SUCCESS);
}
int error = aws_last_error();
AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &error) == AWS_OP_SUCCESS);
}
}

bool has_pending_acquisitions = false;
struct aws_connection_management_transaction pending_acquisitions_work;
if (new_connection_failures > 0) {
/*
* We failed and aren't going to receive a callback, but the current state assumes we will receive
Expand All @@ -1235,44 +1233,45 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
AWS_FATAL_ASSERT(manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] >= new_connection_failures);
s_connection_manager_internal_ref_decrease(manager, AWS_HCMCT_PENDING_CONNECTIONS, new_connection_failures);

/*
* Rather than failing one acquisition for each connection failure, if there's at least one
* connection failure, we instead fail all excess acquisitions, since there's no pending
* connect that will necessarily resolve them.
*
* Try to correspond an error with the acquisition failure, but as a fallback just use the
* representative error.
*/
size_t i = 0;
while (manager->pending_acquisition_count > manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS]) {
int error = representative_error;
if (i < aws_array_list_length(&errors)) {
aws_array_list_get_at(&errors, &error, i);
}

for (size_t i = 0; i < new_connection_failures && manager->pending_acquisition_count > 0; i++) {
int error;
aws_array_list_get_at(&errors, &error, i);
AWS_LOGF_DEBUG(
AWS_LS_HTTP_CONNECTION_MANAGER,
"id=%p: Failing excess connection acquisition with error code %d",
"id=%p: Failing connection acquisition with error code %d",
(void *)manager,
(int)error);
s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error, &work->completions);
++i;
}

has_pending_acquisitions =
manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] + manager->pending_settings_count <
manager->pending_acquisition_count;
if (has_pending_acquisitions) {
/* If there are pending acquisitions, schedule work again to try acquiring them */
s_aws_connection_management_transaction_init(&pending_acquisitions_work, manager);
s_aws_http_connection_manager_build_transaction(&pending_acquisitions_work);
}
aws_mutex_unlock(&manager->lock);
}

/*
* Step 4 - Perform acquisition callbacks
*/
s_aws_http_connection_manager_complete_acquisitions(&work->completions, work->allocator);

aws_array_list_clean_up(&errors);

/*
* Step 5 - Clean up work. Do this here rather than at the end of every caller. Destroy the manager if necessary
*/
s_aws_connection_management_transaction_clean_up(work);

/*
* Step 6 - If some connection acquisition requests failed and we still have more pending acquisitions, try to
* acquire them.
*/
if (has_pending_acquisitions) {
s_aws_http_connection_manager_execute_transaction(&pending_acquisitions_work);
}
}

void aws_http_connection_manager_acquire_connection(
Expand Down Expand Up @@ -1479,12 +1478,11 @@ static void s_cm_on_connection_ready_or_failed(
work->connection_to_release = connection;
}
} else {
/* fail acquisition as one connection cannot be used any more */
while (manager->pending_acquisition_count >
manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] + manager->pending_settings_count) {
if (manager->pending_acquisition_count > 0) {
/* fail acquisition as connection acquire failed */
AWS_LOGF_DEBUG(
AWS_LS_HTTP_CONNECTION_MANAGER,
"id=%p: Failing excess connection acquisition with error code %d",
"id=%p: Failing connection acquisition with error code %d",
(void *)manager,
(int)error_code);
s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error_code, &work->completions);
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ add_net_test_case(connection_manager_setup_shutdown)
add_net_test_case(connection_manager_acquire_release_mix_synchronous)
add_net_test_case(connection_manager_acquisition_timeout)
add_net_test_case(connection_manager_connect_callback_failure)
add_net_test_case(test_connection_manager_connect_callback_async_failure)
add_net_test_case(test_connection_manager_connect_callback_async_with_immediate_failure)
add_net_test_case(connection_manager_connect_immediate_failure)
add_net_test_case(connection_manager_proxy_setup_shutdown)
add_net_test_case(connection_manager_idle_culling_single)
Expand Down
142 changes: 139 additions & 3 deletions tests/test_connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ AWS_TEST_CASE(
connection_manager_max_pending_acquisitions_with_vended_connections,
s_test_connection_manager_max_pending_acquisitions_with_vended_connections);

static int s_aws_http_connection_manager_create_connection_sync_mock(
static int s_aws_http_connection_manager_create_connection_validate(
const struct aws_http_client_connection_options *options) {
struct cm_tester *tester = &s_tester;

Expand All @@ -830,8 +830,6 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
ASSERT_TRUE(aws_byte_cursor_eq_c_str(&interface_name, options->socket_options->network_interface_name));
}

size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);

ASSERT_SUCCESS(aws_mutex_lock(&tester->lock));
tester->release_connection_fn = options->on_shutdown;
ASSERT_SUCCESS(aws_mutex_unlock(&tester->lock));
Expand All @@ -847,7 +845,15 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
ASSERT_UINT_EQUALS(options->proxy_options->connection_type, tester->verify_proxy_options->connection_type);
}

return AWS_OP_SUCCESS;
}
static int s_aws_http_connection_manager_create_connection_sync_mock(
const struct aws_http_client_connection_options *options) {
s_aws_http_connection_manager_create_connection_validate(options);
struct cm_tester *tester = &s_tester;

struct mock_connection *connection = NULL;
size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);

if (next_connection_id < aws_array_list_length(&tester->mock_connections)) {
aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id);
Expand All @@ -868,6 +874,63 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN);
}

struct connect_task_args {
void *user_data;
struct mock_connection *connection;
aws_http_on_client_connection_setup_fn *on_setup;
};

static void s_aws_http_connection_manager_connect_task(
struct aws_task *task,
void *user_data,
enum aws_task_status status) {
(void)status;
struct cm_tester *tester = &s_tester;

struct connect_task_args *task_args = user_data;
struct mock_connection *connection = task_args->connection;
if (connection) {
if (connection->result == AWS_NCRT_SUCCESS) {
task_args->on_setup((struct aws_http_connection *)connection, AWS_ERROR_SUCCESS, task_args->user_data);
} else if (connection->result == AWS_NCRT_ERROR_VIA_CALLBACK) {
task_args->on_setup(NULL, AWS_ERROR_HTTP_UNKNOWN, task_args->user_data);
} else {
AWS_FATAL_ASSERT(0 && "Unexpected connection->result");
}
}

aws_mem_release(tester->allocator, task);
aws_mem_release(tester->allocator, task_args);
}

static int s_aws_http_connection_manager_connect_async_mock(const struct aws_http_client_connection_options *options) {
s_aws_http_connection_manager_create_connection_validate(options);
struct cm_tester *tester = &s_tester;

struct mock_connection *connection = NULL;
size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);
if (next_connection_id < aws_array_list_length(&tester->mock_connections)) {
aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id);
}

if (connection->result == AWS_NCRT_ERROR_FROM_CREATE) {
return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN);
}

struct aws_task *task = aws_mem_calloc(options->allocator, 1, sizeof(struct aws_task));
struct connect_task_args *task_args = aws_mem_calloc(options->allocator, 1, sizeof(struct connect_task_args));
task_args->connection = connection;
task_args->user_data = options->user_data;
task_args->on_setup = options->on_setup;
aws_task_init(task, s_aws_http_connection_manager_connect_task, task_args, "create_connection_task");

struct aws_event_loop *event_loop = aws_event_loop_group_get_next_loop(tester->event_loop_group);
uint64_t now;
ASSERT_SUCCESS(aws_event_loop_current_clock_time(event_loop, &now));
aws_event_loop_schedule_task_future(event_loop, task, now + 1000000000);
return AWS_OP_SUCCESS;
}

static void s_aws_http_connection_manager_release_connection_sync_mock(struct aws_http_connection *connection) {
(void)connection;

Expand Down Expand Up @@ -920,6 +983,17 @@ static struct aws_http_connection_manager_system_vtable s_synchronous_mocks = {
.aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock,
};

static struct aws_http_connection_manager_system_vtable s_async_connect_mock = {
.aws_http_client_connect = s_aws_http_connection_manager_connect_async_mock,
.aws_http_connection_release = s_aws_http_connection_manager_release_connection_sync_mock,
.aws_http_connection_close = s_aws_http_connection_manager_close_connection_sync_mock,
.aws_http_connection_new_requests_allowed = s_aws_http_connection_manager_is_connection_available_sync_mock,
.aws_high_res_clock_get_ticks = aws_high_res_clock_get_ticks,
.aws_http_connection_get_channel = s_aws_http_connection_manager_connection_get_channel_sync_mock,
.aws_channel_thread_is_callers_thread = s_aws_http_connection_manager_is_callers_thread_sync_mock,
.aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock,
};

static int s_test_connection_manager_with_network_interface_list(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
struct aws_byte_cursor *interface_names_array = aws_mem_calloc(allocator, 3, sizeof(struct aws_byte_cursor));
Expand Down Expand Up @@ -1055,6 +1129,68 @@ static int s_test_connection_manager_connect_callback_failure(struct aws_allocat
}
AWS_TEST_CASE(connection_manager_connect_callback_failure, s_test_connection_manager_connect_callback_failure);

static int s_test_connection_manager_connect_callback_async_failure(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
int error_connections = 5;
int success_connections = 5;
struct cm_tester_options options = {
.allocator = allocator,
.max_connections = error_connections,
.mock_table = &s_async_connect_mock,
};

ASSERT_SUCCESS(s_cm_tester_init(&options));

s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false);
s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true);

s_acquire_connections(error_connections + success_connections);

ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections));

ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections);
ASSERT_SUCCESS(s_release_connections(success_connections, false));

ASSERT_SUCCESS(s_cm_tester_clean_up());

return AWS_OP_SUCCESS;
}
AWS_TEST_CASE(
test_connection_manager_connect_callback_async_failure,
s_test_connection_manager_connect_callback_async_failure);

static int s_test_connection_manager_connect_callback_async_with_immediate_failure(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;
int error_connections = 5;
int success_connections = 5;
struct cm_tester_options options = {
.allocator = allocator,
.max_connections = error_connections + success_connections,
.mock_table = &s_async_connect_mock,
};

ASSERT_SUCCESS(s_cm_tester_init(&options));

s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true);
s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false);

s_acquire_connections(error_connections + success_connections);

ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections));

ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections);
ASSERT_SUCCESS(s_release_connections(success_connections, false));

ASSERT_SUCCESS(s_cm_tester_clean_up());

return AWS_OP_SUCCESS;
}
AWS_TEST_CASE(
test_connection_manager_connect_callback_async_with_immediate_failure,
s_test_connection_manager_connect_callback_async_with_immediate_failure);

static int s_test_connection_manager_connect_immediate_failure(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

Expand Down