Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions oshmem/mca/spml/ucx/spml_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,19 +573,17 @@ static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t
array->ctxs_count++;
}

static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx, int i)
{
int i;

for (i = 0; i < array->ctxs_count; i++) {
for (; i < array->ctxs_count; i++) {
if (array->ctxs[i] == ctx) {
array->ctxs[i] = array->ctxs[array->ctxs_count-1];
array->ctxs[array->ctxs_count-1] = NULL;
array->ctxs_count--;
break;
}
}

array->ctxs_count--;
opal_atomic_wmb ();
}

Expand Down Expand Up @@ -681,27 +679,45 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx

int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
{
mca_spml_ucx_ctx_t *ucx_ctx;
int rc;
mca_spml_ucx_ctx_t *ucx_ctx = NULL;
mca_spml_ucx_ctx_array_t *idle_array = &mca_spml_ucx.idle_array;
mca_spml_ucx_ctx_array_t *active_array = &mca_spml_ucx.active_array;
int i = 0, rc = OSHMEM_SUCCESS;

/* Take a lock controlling context creation. AUX context may set specific
* UCX parameters affecting worker creation, which are not needed for
* regular contexts. */
pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex);
rc = mca_spml_ucx_ctx_create_common(options, &ucx_ctx);
pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex);
if (rc != OSHMEM_SUCCESS) {
return rc;
}

if (mca_spml_ucx.active_array.ctxs_count == 0) {
opal_progress_register(spml_ucx_ctx_progress);
}

/* Check if we have an idle context to reuse */
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
for (i = 0; i < idle_array->ctxs_count; i++) {
if (idle_array->ctxs[i]->options & options) {
ucx_ctx = idle_array->ctxs[i];
_ctx_remove(idle_array, ucx_ctx, i);
break;
}
}
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);

/* If we cannot reuse, create new ctx */
if (ucx_ctx == NULL) {
pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex);
rc = mca_spml_ucx_ctx_create_common(options, &ucx_ctx);
pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex);
if (rc != OSHMEM_SUCCESS) {
return rc;
}
}

if (!(options & SHMEM_CTX_PRIVATE)) {
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
if (mca_spml_ucx.active_array.ctxs_count == 0) {
opal_progress_register(spml_ucx_ctx_progress);
}
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
}

(*ctx) = (shmem_ctx_t)ucx_ctx;
return OSHMEM_SUCCESS;
}
Expand All @@ -711,13 +727,14 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
MCA_SPML_CALL(quiet(ctx));

SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx);
if (!(((mca_spml_ucx_ctx_t *)ctx)->options & SHMEM_CTX_PRIVATE)) {
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx, 0);
}
_ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx);
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);

if (!mca_spml_ucx.active_array.ctxs_count) {
opal_progress_unregister(spml_ucx_ctx_progress);
}
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
}

int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)
Expand Down