Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions include/aws/io/channel_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ struct aws_server_socket_channel_bootstrap_options {
uint32_t port;
const struct aws_socket_options *socket_options;
const struct aws_tls_connection_options *tls_options;
aws_server_bootstrap_on_listener_setup_fn *setup_callback;
aws_server_bootstrap_on_accept_channel_setup_fn *incoming_callback;
aws_server_bootstrap_on_accept_channel_shutdown_fn *shutdown_callback;
aws_server_bootstrap_on_listener_setup_fn *setup_callback;
aws_server_bootstrap_on_server_listener_destroy_fn *destroy_callback;
bool enable_read_back_pressure;
void *user_data;
Expand Down Expand Up @@ -297,6 +297,9 @@ AWS_IO_API int aws_server_bootstrap_set_alpn_callback(
* shutting down. Immediately after the `shutdown_callback` returns, the channel is cleaned up automatically. All
* callbacks are invoked the thread of the event-loop that the listening socket is assigned to
*
* `setup_callback`. If set, the callback will be asynchronously invoked when the listener is ready for use. For Apple
* Network Framework, the listener is not usable until the callback is invoked.
*
* Upon shutdown of your application, you'll want to call `aws_server_bootstrap_destroy_socket_listener` with the return
* value from this function.
*
Expand All @@ -305,9 +308,6 @@ AWS_IO_API int aws_server_bootstrap_set_alpn_callback(
AWS_IO_API struct aws_socket *aws_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options);

AWS_IO_API struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options);

/**
* Shuts down 'listener' and cleans up any resources associated with it. Any incoming channels on `listener` will still
* be active. `destroy_callback` will be invoked after the server socket listener is destroyed, and all associated
Expand Down
4 changes: 2 additions & 2 deletions include/aws/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ struct aws_socket_listener_options {
void *on_accept_result_user_data;

// This callback is invoked when the listener starts accepting incoming connections.
// It is only triggered in asynchronous listener APIs while using nw_socket.
aws_socket_on_accept_started_fn *on_accept_start_result;
// If the callback set, the socket must not be released before the callback invoked.
aws_socket_on_accept_started_fn *on_accept_start;
void *on_accept_start_user_data;
};

Expand Down
169 changes: 25 additions & 144 deletions source/channel_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1704,141 +1704,6 @@ static void s_listener_destroy_task(struct aws_task *task, void *arg, enum aws_t
aws_socket_clean_up(&server_connection_args->listener);
}

struct aws_socket *aws_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options) {
AWS_PRECONDITION(bootstrap_options);
AWS_PRECONDITION(bootstrap_options->bootstrap);
AWS_PRECONDITION(bootstrap_options->incoming_callback);
AWS_PRECONDITION(bootstrap_options->shutdown_callback);

struct server_connection_args *server_connection_args =
aws_mem_calloc(bootstrap_options->bootstrap->allocator, 1, sizeof(struct server_connection_args));
if (!server_connection_args) {
return NULL;
}

AWS_LOGF_INFO(
AWS_LS_IO_CHANNEL_BOOTSTRAP,
"id=%p: attempting to initialize a new "
"server socket listener for %s:%u",
(void *)bootstrap_options->bootstrap,
bootstrap_options->host_name,
bootstrap_options->port);

aws_ref_count_init(
&server_connection_args->ref_count,
server_connection_args,
(aws_simple_completion_callback *)s_server_connection_args_destroy);
server_connection_args->user_data = bootstrap_options->user_data;
server_connection_args->bootstrap = aws_server_bootstrap_acquire(bootstrap_options->bootstrap);
server_connection_args->shutdown_callback = bootstrap_options->shutdown_callback;
server_connection_args->incoming_callback = bootstrap_options->incoming_callback;
server_connection_args->destroy_callback = bootstrap_options->destroy_callback;
server_connection_args->on_protocol_negotiated = bootstrap_options->bootstrap->on_protocol_negotiated;
server_connection_args->enable_read_back_pressure = bootstrap_options->enable_read_back_pressure;

aws_task_init(
&server_connection_args->listener_destroy_task,
s_listener_destroy_task,
server_connection_args,
"listener socket destroy");

if (bootstrap_options->tls_options) {
AWS_LOGF_INFO(
AWS_LS_IO_CHANNEL_BOOTSTRAP, "id=%p: using tls on listener", (void *)bootstrap_options->tls_options);
if (aws_tls_connection_options_copy(&server_connection_args->tls_options, bootstrap_options->tls_options)) {
goto cleanup_server_connection_args;
}

server_connection_args->use_tls = true;

server_connection_args->tls_user_data = bootstrap_options->tls_options->user_data;

/* in order to honor any callbacks a user may have installed on their tls_connection_options,
* we need to wrap them if they were set.*/
if (bootstrap_options->bootstrap->on_protocol_negotiated) {
server_connection_args->tls_options.advertise_alpn_message = true;
}

if (bootstrap_options->tls_options->on_data_read) {
server_connection_args->user_on_data_read = bootstrap_options->tls_options->on_data_read;
server_connection_args->tls_options.on_data_read = s_tls_server_on_data_read;
}

if (bootstrap_options->tls_options->on_error) {
server_connection_args->user_on_error = bootstrap_options->tls_options->on_error;
server_connection_args->tls_options.on_error = s_tls_server_on_error;
}

if (bootstrap_options->tls_options->on_negotiation_result) {
server_connection_args->user_on_negotiation_result = bootstrap_options->tls_options->on_negotiation_result;
}

server_connection_args->tls_options.on_negotiation_result = s_tls_server_on_negotiation_result;
server_connection_args->tls_options.user_data = server_connection_args;
}

struct aws_event_loop *connection_loop =
aws_event_loop_group_get_next_loop(bootstrap_options->bootstrap->event_loop_group);

if (aws_socket_init(
&server_connection_args->listener,
bootstrap_options->bootstrap->allocator,
bootstrap_options->socket_options)) {
goto cleanup_server_connection_args;
}

struct aws_socket_endpoint endpoint;
AWS_ZERO_STRUCT(endpoint);
size_t host_name_len = 0;
if (aws_secure_strlen(bootstrap_options->host_name, sizeof(endpoint.address), &host_name_len)) {
goto cleanup_server_connection_args;
}

memcpy(endpoint.address, bootstrap_options->host_name, host_name_len);
endpoint.port = bootstrap_options->port;

if (aws_socket_bind(&server_connection_args->listener, &endpoint)) {
goto cleanup_listener;
}

if (aws_socket_listen(&server_connection_args->listener, 1024)) {
goto cleanup_listener;
}

struct aws_socket_listener_options options = {
.on_accept_result = s_on_server_connection_result,
.on_accept_result_user_data = server_connection_args,
.on_accept_start_result = NULL,
.on_accept_start_user_data = NULL,
};

if (aws_socket_start_accept(&server_connection_args->listener, connection_loop, options)) {
goto cleanup_listener;
}

return &server_connection_args->listener;

cleanup_listener:

; // This line just used to avoid expression error after the label

SETUP_SOCKET_SHUTDOWN_CALLBACKS(
bootstrap_options->bootstrap->allocator,
&server_connection_args->listener,
socket_shutdown_release_server_connection_args,
s_socket_shutdown_complete_release_server_connection_fn,
server_connection_args)

aws_socket_clean_up(&server_connection_args->listener);
return NULL;

cleanup_server_connection_args:
s_server_connection_args_release(server_connection_args);

return NULL;
}

/* Called when a listener connection attempt task completes.
*/
static void s_on_listener_connection_established(struct aws_socket *socket, int error_code, void *user_data) {
Expand Down Expand Up @@ -1873,13 +1738,16 @@ static void s_on_listener_connection_established(struct aws_socket *socket, int
return;
}

struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options) {
struct aws_socket *s_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options,
bool async_setup) {
AWS_PRECONDITION(bootstrap_options);
AWS_PRECONDITION(bootstrap_options->bootstrap);
AWS_PRECONDITION(bootstrap_options->incoming_callback);
AWS_PRECONDITION(bootstrap_options->shutdown_callback);
AWS_PRECONDITION(bootstrap_options->setup_callback);
if (async_setup) {
AWS_PRECONDITION(bootstrap_options->setup_callback);
}

struct server_connection_args *server_connection_args =
aws_mem_calloc(bootstrap_options->bootstrap->allocator, 1, sizeof(struct server_connection_args));
Expand Down Expand Up @@ -1977,18 +1845,26 @@ struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
goto cleanup_listener;
}

// Acquire for listener establish callbacks, should be released in `s_on_listener_connection_established`
s_server_connection_args_acquire(server_connection_args);

struct aws_socket_listener_options options = {
.on_accept_result = s_on_server_connection_result,
.on_accept_result_user_data = server_connection_args,
.on_accept_start_result = s_on_listener_connection_established,
.on_accept_start_user_data = server_connection_args,
.on_accept_start = NULL,
.on_accept_start_user_data = NULL,
};

if (async_setup) {
// If we use an async socket, acquire the connection args for listener establish callbacks, if
// aws_socket_start_accept succeed, the args should be released in `s_on_listener_connection_established`
s_server_connection_args_acquire(server_connection_args);
options.on_accept_start = s_on_listener_connection_established;
options.on_accept_start_user_data = server_connection_args;
}

if (aws_socket_start_accept(&server_connection_args->listener, connection_loop, options)) {
s_server_connection_args_release(server_connection_args);
if (async_setup) {
// release the args we acquired above
s_server_connection_args_release(server_connection_args);
}
goto cleanup_listener;
}

Expand All @@ -2014,6 +1890,11 @@ struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
return NULL;
}

struct aws_socket *aws_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options) {
return s_server_bootstrap_new_socket_listener(bootstrap_options, bootstrap_options->setup_callback);
}

void aws_server_bootstrap_destroy_socket_listener(struct aws_server_bootstrap *bootstrap, struct aws_socket *listener) {
struct server_connection_args *server_connection_args =
AWS_CONTAINER_OF(listener, struct server_connection_args, listener);
Expand Down
35 changes: 13 additions & 22 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,6 @@ static int s_socket_listen_fn(struct aws_socket *socket, int backlog_size) {
struct listener_state_changed_args {
struct aws_task task;
struct aws_allocator *allocator;
struct aws_socket *socket;
struct nw_socket *nw_socket;
nw_listener_state_t state;
int error;
Expand All @@ -1588,10 +1587,9 @@ static void s_process_listener_state_changed_task(struct aws_task *task, void *a
(void *)nw_socket,
(void *)nw_listener);

/* Ideally we should not have a canceled task here, as nw_socket keeps a reference to event loop, therefore the
* event loop should never be destroyed before the nw_socket get destroyed. If we manually cancel the task, we
* should make sure we carefully handled the state change eventually, as the socket relies on this task to release
* and cleanup.
/* Ideally we should not have a task with AWS_TASK_STATUS_CANCELED here, as the event loop should never be destroyed
* before the nw_socket get destroyed. If we manually cancel the task, we should make sure we carefully handled the
* state change eventually, as the socket relies on this task to release and cleanup.
*/
if (status != AWS_TASK_STATUS_CANCELED) {

Expand All @@ -1612,10 +1610,10 @@ static void s_process_listener_state_changed_task(struct aws_task *task, void *a
crt_error_code);

s_lock_base_socket(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
s_lock_socket_synced_data(nw_socket);
s_set_socket_state(nw_socket, listener_state_changed_args->socket, ERROR);
s_set_socket_state(nw_socket, aws_socket, ERROR);
s_unlock_socket_synced_data(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
if (nw_socket->on_accept_started_fn) {
nw_socket->on_accept_started_fn(
aws_socket, crt_error_code, nw_socket->listen_accept_started_user_data);
Expand Down Expand Up @@ -1646,9 +1644,12 @@ static void s_process_listener_state_changed_task(struct aws_task *task, void *a
case nw_listener_state_cancelled: {
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET, "id=%p handle=%p: listener cancelled.", (void *)nw_socket, (void *)nw_listener);
s_lock_base_socket(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
s_lock_socket_synced_data(nw_socket);
s_set_socket_state(nw_socket, listener_state_changed_args->socket, CLOSED);
s_set_socket_state(nw_socket, aws_socket, CLOSED);
s_unlock_socket_synced_data(nw_socket);
s_unlock_base_socket(nw_socket);
s_socket_release_internal_ref(nw_socket);
} break;
default:
Expand Down Expand Up @@ -1679,13 +1680,10 @@ static void s_handle_listener_state_changed_fn(
nw_error_code,
crt_error_code);

s_lock_base_socket(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
if (aws_socket && s_validate_event_loop(nw_socket->event_loop)) {
if (s_validate_event_loop(nw_socket->event_loop)) {
struct listener_state_changed_args *args =
aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct listener_state_changed_args));

args->socket = aws_socket;
args->nw_socket = nw_socket;
args->allocator = nw_socket->allocator;
args->error = crt_error_code;
Expand All @@ -1694,16 +1692,9 @@ static void s_handle_listener_state_changed_fn(
s_socket_acquire_internal_ref(nw_socket);
aws_task_init(&args->task, s_process_listener_state_changed_task, args, "ListenerStateChangedTask");
aws_event_loop_schedule_task_now(nw_socket->event_loop, &args->task);
} else if (state == nw_listener_state_cancelled) {
// If socket is already destroyed and the listener is canceled, directly closed the internal socket.
s_lock_socket_synced_data(nw_socket);
s_set_socket_state(nw_socket, aws_socket, CLOSED);
s_unlock_socket_synced_data(nw_socket);

s_socket_release_internal_ref(nw_socket);
} else {
AWS_FATAL_ASSERT(true && "The nw_socket should be always attached to a validate event loop.");
}

s_unlock_base_socket(nw_socket);
}

static int s_socket_start_accept_fn(
Expand Down Expand Up @@ -1740,7 +1731,7 @@ static int s_socket_start_accept_fn(
socket->accept_result_fn = options.on_accept_result;
socket->connect_accept_user_data = options.on_accept_result_user_data;

nw_socket->on_accept_started_fn = options.on_accept_start_result;
nw_socket->on_accept_started_fn = options.on_accept_start;
nw_socket->listen_accept_started_user_data = options.on_accept_start_user_data;

s_set_event_loop(socket, accept_loop);
Expand Down
Loading
Loading