Skip to content

Commit d91d76a

Browse files
author
Mamzi Bayatpour [email protected] ()
committed
atomic add for nb outstanding ops and renaming the thread enabled var
1 parent 37163ce commit d91d76a

File tree

4 files changed

+19
-19
lines changed

4 files changed

+19
-19
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ typedef struct ompi_osc_ucx_component {
4646

4747
OMPI_DECLSPEC extern ompi_osc_ucx_component_t mca_osc_ucx_component;
4848

49-
#define OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(_module) \
50-
do { \
51-
_module->ctx->num_incomplete_req_ops++; \
49+
#define OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(_module) \
50+
do { \
51+
opal_atomic_add_fetch_size_t(&_module->ctx->num_incomplete_req_ops, 1); \
5252
} while(0);
5353

54-
#define OSC_UCX_DECREMENT_OUTSTANDING_NB_OPS(_module) \
55-
do { \
56-
_module->ctx->num_incomplete_req_ops--; \
54+
#define OSC_UCX_DECREMENT_OUTSTANDING_NB_OPS(_module) \
55+
do { \
56+
opal_atomic_add_fetch_size_t(&_module->ctx->num_incomplete_req_ops, -1); \
5757
} while(0);
5858

5959
typedef enum ompi_osc_ucx_epoch {
@@ -164,10 +164,10 @@ typedef struct ompi_osc_ucx_lock {
164164
#define OSC_UCX_GET_EP(_module, rank_) (mca_osc_ucx_component.endpoints[_module->comm_world_ranks[rank_]])
165165
#define OSC_UCX_GET_DISP(module_, rank_) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit)
166166

167-
extern bool opal_mca_common_ucx_mpi_thread_multiple_enabled;
167+
extern bool thread_enabled;
168168

169169
#define OSC_UCX_GET_DEFAULT_EP(_ep_ptr, _module, _target) \
170-
if (opal_mca_common_ucx_mpi_thread_multiple_enabled) { \
170+
if (thread_enabled) { \
171171
_ep_ptr = NULL; \
172172
} else { \
173173
_ep_ptr = (ucp_ep_h *)&(OSC_UCX_GET_EP(_module, _target)); \

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ static int component_register(void) {
175175
MCA_BASE_VAR_SCOPE_GROUP, &mca_osc_ucx_component.no_locks);
176176
free(description_str);
177177

178-
opal_mca_common_ucx_mpi_thread_multiple_enabled = opal_using_threads();
178+
thread_enabled = opal_using_threads();
179179
mca_osc_ucx_component.acc_single_intrinsic = false;
180180
opal_asprintf(&description_str, "Enable optimizations for MPI_Fetch_and_op, MPI_Accumulate, etc for codes "
181181
"that will not use anything more than a single predefined datatype (default: %s)",
@@ -188,7 +188,7 @@ static int component_register(void) {
188188
MCA_BASE_VAR_SCOPE_GROUP, &enable_nonblocking_accumulate);
189189
(void) mca_base_component_var_register(&mca_osc_ucx_component.super.osc_version, "enable_wpool_thread_multiple",
190190
description_str, MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_5,
191-
MCA_BASE_VAR_SCOPE_GROUP, &opal_mca_common_ucx_mpi_thread_multiple_enabled);
191+
MCA_BASE_VAR_SCOPE_GROUP, &thread_enabled);
192192
(void) mca_base_component_var_register(&mca_osc_ucx_component.super.osc_version, "outstanding_ops_flush_threshold",
193193
description_str, MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_5,
194194
MCA_BASE_VAR_SCOPE_GROUP, &ompi_osc_ucx_outstanding_ops_flush_threshold);
@@ -298,7 +298,7 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
298298
}
299299

300300
static int component_finalize(void) {
301-
if (!opal_mca_common_ucx_mpi_thread_multiple_enabled) {
301+
if (!thread_enabled) {
302302
int i;
303303
for (i = 0; i < mca_osc_ucx_component.comm_world_size; i++) {
304304
ucp_ep_h ep = mca_osc_ucx_component.endpoints[i];
@@ -497,7 +497,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
497497
OSC_UCX_VERBOSE(1, "opal_common_ucx_wpool_init failed: %d", ret);
498498
goto select_unlock;
499499
}
500-
if (!opal_mca_common_ucx_mpi_thread_multiple_enabled) {
500+
if (!thread_enabled) {
501501
mca_osc_ucx_component.comm_world_size = ompi_proc_world_size();
502502
mca_osc_ucx_component.endpoints = calloc(mca_osc_ucx_component.comm_world_size, sizeof(ucp_ep_h));
503503
}

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ __thread FILE *tls_pf = NULL;
3131
__thread int initialized = 0;
3232
#endif
3333

34-
bool opal_mca_common_ucx_mpi_thread_multiple_enabled;
34+
bool thread_enabled;
3535

3636
static _ctx_record_t *_tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx);
3737
static inline _ctx_record_t *_tlocal_get_ctx_rec(opal_tsd_tracked_key_t tls_key);
@@ -50,7 +50,7 @@ static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool)
5050
ucs_status_t status;
5151
opal_common_ucx_winfo_t *winfo = NULL;
5252

53-
if (opal_mca_common_ucx_mpi_thread_multiple_enabled || wpool->dflt_winfo == NULL) {
53+
if (thread_enabled || wpool->dflt_winfo == NULL) {
5454
memset(&worker_params, 0, sizeof(worker_params));
5555
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
5656
worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
@@ -98,7 +98,7 @@ static void _winfo_destructor(opal_common_ucx_winfo_t *winfo)
9898

9999
if (winfo->comm_size != 0) {
100100
size_t i;
101-
if (opal_mca_common_ucx_mpi_thread_multiple_enabled) {
101+
if (thread_enabled) {
102102
for (i = 0; i < winfo->comm_size; i++) {
103103
if (NULL != winfo->endpoints[i]) {
104104
ucp_ep_destroy(winfo->endpoints[i]);
@@ -113,7 +113,7 @@ static void _winfo_destructor(opal_common_ucx_winfo_t *winfo)
113113
winfo->comm_size = 0;
114114

115115
OBJ_DESTRUCT(&winfo->mutex);
116-
if (opal_mca_common_ucx_mpi_thread_multiple_enabled || winfo->is_dflt_winfo) {
116+
if (thread_enabled || winfo->is_dflt_winfo) {
117117
ucp_worker_destroy(winfo->worker);
118118
}
119119

@@ -732,13 +732,13 @@ OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *me
732732

733733
/* Obtain the endpoint */
734734
if (OPAL_UNLIKELY(NULL == winfo->endpoints[target])) {
735-
if (opal_mca_common_ucx_mpi_thread_multiple_enabled || (dflt_ep == NULL) ||
735+
if (thread_enabled || (dflt_ep == NULL) ||
736736
(*dflt_ep == NULL)) {
737737
rc = _tlocal_ctx_connect(ctx_rec, target);
738738
if (rc != OPAL_SUCCESS) {
739739
return rc;
740740
}
741-
if (!opal_mca_common_ucx_mpi_thread_multiple_enabled && (dflt_ep != NULL) &&
741+
if (!thread_enabled && (dflt_ep != NULL) &&
742742
(*dflt_ep == NULL)) {
743743
/* set the proc ep */
744744
*dflt_ep = winfo->endpoints[target];

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ typedef struct {
8686
char *recv_worker_addrs;
8787
int *recv_worker_displs;
8888
size_t comm_size;
89-
int num_incomplete_req_ops;
89+
opal_atomic_size_t num_incomplete_req_ops;
9090
} opal_common_ucx_ctx_t;
9191

9292
/* Worker Pool memory (wpmem) is an object that represents a remotely accessible

0 commit comments

Comments
 (0)