Skip to content

Commit 37163ce

Browse files
author
Mamzi Bayatpour [email protected] ()
committed
move num_incomplete_req_ops to osc ucx context
1 parent 08a847c commit 37163ce

File tree

8 files changed

+60
-77
lines changed

8 files changed

+60
-77
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ typedef struct ompi_osc_ucx_component {
3434
bool enable_mpi_threads;
3535
opal_free_list_t requests; /* request free list for the r* communication variants */
3636
bool env_initialized; /* UCX environment is initialized or not */
37-
int num_incomplete_req_ops;
3837
int comm_world_size;
3938
ucp_ep_h *endpoints;
4039
int num_modules;
@@ -47,14 +46,14 @@ typedef struct ompi_osc_ucx_component {
4746

4847
OMPI_DECLSPEC extern ompi_osc_ucx_component_t mca_osc_ucx_component;
4948

50-
#define OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS() \
49+
#define OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(_module) \
5150
do { \
52-
mca_osc_ucx_component.num_incomplete_req_ops++; \
51+
_module->ctx->num_incomplete_req_ops++; \
5352
} while(0);
5453

55-
#define OSC_UCX_DECREMENT_OUTSTANDING_NB_OPS() \
54+
#define OSC_UCX_DECREMENT_OUTSTANDING_NB_OPS(_module) \
5655
do { \
57-
mca_osc_ucx_component.num_incomplete_req_ops--; \
56+
_module->ctx->num_incomplete_req_ops--; \
5857
} while(0);
5958

6059
typedef enum ompi_osc_ucx_epoch {

ompi/mca/osc/ucx/osc_ucx_active_target.c

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ int ompi_osc_ucx_fence(int mpi_assert, struct ompi_win_t *win) {
7474
}
7575

7676
if (!(mpi_assert & MPI_MODE_NOPRECEDE)) {
77-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER,
78-
&mca_osc_ucx_component.num_incomplete_req_ops, 0/*ignore*/);
77+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0/*ignore*/);
7978
if (ret != OMPI_SUCCESS) {
8079
return ret;
8180
}
@@ -172,8 +171,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
172171
return OMPI_ERR_RMA_SYNC;
173172
}
174173

175-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER,
176-
&mca_osc_ucx_component.num_incomplete_req_ops, 0/*ignore*/);
174+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0/*ignore*/);
177175
if (ret != OMPI_SUCCESS) {
178176
return ret;
179177
}
@@ -193,9 +191,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
193191
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_post failed: %d", ret);
194192
}
195193

196-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
197-
&mca_osc_ucx_component.num_incomplete_req_ops,
198-
module->start_grp_ranks[i]);
194+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, module->start_grp_ranks[i]);
199195
if (ret != OMPI_SUCCESS) {
200196
return ret;
201197
}

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
290290
goto cleanup;
291291
}
292292

293-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
294-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
293+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
295294
if (ret != OPAL_SUCCESS) {
296295
ret = OMPI_ERROR;
297296
goto cleanup;
@@ -669,8 +668,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
669668
return ret;
670669
}
671670

672-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
673-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
671+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
674672
if (ret != OMPI_SUCCESS) {
675673
free(temp_addr);
676674
return ret;
@@ -730,8 +728,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
730728

731729
}
732730

733-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
734-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
731+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
735732
if (ret != OPAL_SUCCESS) {
736733
return ret;
737734
}
@@ -844,8 +841,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
844841
return OMPI_ERROR;
845842
}
846843

847-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
848-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
844+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
849845
if (ret != OPAL_SUCCESS) {
850846
return ret;
851847
}
@@ -968,8 +964,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
968964

969965
if (op != &ompi_mpi_op_no_op.op) {
970966
if (op == &ompi_mpi_op_replace.op) {
971-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
972-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
967+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
973968
if (ret != OMPI_SUCCESS) {
974969
return ret;
975970
}
@@ -1009,8 +1004,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10091004
return ret;
10101005
}
10111006

1012-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
1013-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
1007+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
10141008
if (ret != OMPI_SUCCESS) {
10151009
return ret;
10161010
}
@@ -1066,8 +1060,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10661060
}
10671061
}
10681062

1069-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
1070-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
1063+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
10711064
if (ret != OPAL_SUCCESS) {
10721065
return ret;
10731066
}
@@ -1134,8 +1127,9 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11341127
}
11351128

11361129
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
1130+
ucx_req->module = module;
11371131

1138-
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS();
1132+
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(module);
11391133
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req, ep);
11401134

11411135
if (ret != OMPI_SUCCESS) {
@@ -1189,8 +1183,9 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11891183
}
11901184

11911185
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
1186+
ucx_req->module = module;
11921187

1193-
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS();
1188+
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(module);
11941189
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req, ep);
11951190

11961191
if (ret != OMPI_SUCCESS) {
@@ -1232,6 +1227,7 @@ int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count,
12321227
}
12331228

12341229
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
1230+
ucx_req->module = module;
12351231
assert(NULL != ucx_req);
12361232

12371233
ret = accumulate_req(origin_addr, origin_count, origin_dt, target, target_disp,
@@ -1264,6 +1260,7 @@ int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count,
12641260
}
12651261

12661262
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
1263+
ucx_req->module = module;
12671264
assert(NULL != ucx_req);
12681265

12691266
ret = get_accumulate_req(origin_addr, origin_count, origin_datatype,
@@ -1302,7 +1299,7 @@ static inline int ompi_osc_ucx_acc_rputget(void *stage_addr, int stage_count,
13021299
ucx_req->acc.op = op;
13031300
ucx_req->acc.acc_type = acc_type;
13041301
ucx_req->acc.phase = phase;
1305-
ucx_req->acc.module = module;
1302+
ucx_req->module = module;
13061303
ucx_req->acc.target = target;
13071304
ucx_req->acc.lock_acquired = lock_acquired;
13081305
ucx_req->acc.win = win;
@@ -1349,7 +1346,7 @@ static inline int ompi_osc_ucx_acc_rputget(void *stage_addr, int stage_count,
13491346

13501347
module->skip_sync_check = sync_check;
13511348
if (acc_type != NONE) {
1352-
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS();
1349+
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(module);
13531350
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req, ep);
13541351

13551352
if (ret != OMPI_SUCCESS) {
@@ -1414,9 +1411,8 @@ static int ompi_osc_ucx_get_accumulate_nonblocking(const void *origin_addr, int
14141411
return ret;
14151412
}
14161413

1417-
if (mca_osc_ucx_component.num_incomplete_req_ops > ompi_osc_ucx_outstanding_ops_flush_threshold) {
1418-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER,
1419-
&mca_osc_ucx_component.num_incomplete_req_ops, 0);
1414+
if (module->ctx->num_incomplete_req_ops > ompi_osc_ucx_outstanding_ops_flush_threshold) {
1415+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
14201416
if (ret != OPAL_SUCCESS) {
14211417
ret = OMPI_ERROR;
14221418
return ret;
@@ -1484,7 +1480,7 @@ static int ompi_osc_ucx_get_accumulate_nonblocking(const void *origin_addr, int
14841480
void req_completion(void *request) {
14851481
ompi_osc_ucx_request_t *req = (ompi_osc_ucx_request_t *)request;
14861482
int ret = OMPI_SUCCESS;
1487-
1483+
ompi_osc_ucx_module_t *module = req->module;
14881484
if (req->acc.acc_type != NONE) {
14891485
assert(req->acc.phase != ACC_INIT);
14901486
void *free_addr = NULL;
@@ -1505,8 +1501,8 @@ void req_completion(void *request) {
15051501

15061502
if (req->acc.phase != ACC_FINALIZE) {
15071503
/* Avoid calling flush while we are already in progress */
1508-
req->acc.module->mem->skip_periodic_flush = true;
1509-
req->acc.module->state_mem->skip_periodic_flush = true;
1504+
module->mem->skip_periodic_flush = true;
1505+
module->state_mem->skip_periodic_flush = true;
15101506
}
15111507

15121508
switch (req->acc.phase) {
@@ -1620,7 +1616,7 @@ void req_completion(void *request) {
16201616
if (req->acc.acc_type == GET_ACCUMULATE) {
16211617
/* Do fence to make sure target results are received before
16221618
* writing into target */
1623-
ret = opal_common_ucx_wpmem_fence(req->acc.module->mem);
1619+
ret = opal_common_ucx_wpmem_fence(module->mem);
16241620
if (ret != OMPI_SUCCESS) {
16251621
OSC_UCX_ERROR("opal_common_ucx_mem_fence failed: %d", ret);
16261622
abort();
@@ -1650,17 +1646,17 @@ void req_completion(void *request) {
16501646
if (release_lock) {
16511647
/* Ordering between previous put/get operations and unlock will be realized
16521648
* through the ucp fence inside the finalize function */
1653-
ompi_osc_ucx_nonblocking_ops_finalize(req->acc.module, target,
1649+
ompi_osc_ucx_nonblocking_ops_finalize(module, target,
16541650
req->acc.lock_acquired, win, free_addr);
16551651
}
16561652

16571653
if (req->acc.phase != ACC_FINALIZE) {
1658-
req->acc.module->mem->skip_periodic_flush = false;
1659-
req->acc.module->state_mem->skip_periodic_flush = false;
1654+
module->mem->skip_periodic_flush = false;
1655+
module->state_mem->skip_periodic_flush = false;
16601656
}
16611657
}
16621658

1663-
OSC_UCX_DECREMENT_OUTSTANDING_NB_OPS();
1659+
OSC_UCX_DECREMENT_OUTSTANDING_NB_OPS(module);
16641660
ompi_request_complete(&(req->super), true);
1665-
assert(mca_osc_ucx_component.num_incomplete_req_ops >= 0);
1661+
assert(module->ctx->num_incomplete_req_ops >= 0);
16661662
}

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ ompi_osc_ucx_component_t mca_osc_ucx_component = {
7676
},
7777
.wpool = NULL,
7878
.env_initialized = false,
79-
.num_incomplete_req_ops = 0,
8079
.num_modules = 0,
8180
.acc_single_intrinsic = false,
8281
.comm_world_size = 0,
@@ -299,7 +298,6 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
299298
}
300299

301300
static int component_finalize(void) {
302-
assert(mca_osc_ucx_component.num_incomplete_req_ops == 0);
303301
if (!opal_mca_common_ucx_mpi_thread_multiple_enabled) {
304302
int i;
305303
for (i = 0; i < mca_osc_ucx_component.comm_world_size; i++) {
@@ -959,8 +957,7 @@ inline int ompi_osc_ucx_state_unlock(
959957
assert(result_value == TARGET_LOCK_EXCLUSIVE);
960958
} else if (NULL != free_ptr){
961959
/* flush before freeing the buffer */
962-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
963-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
960+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
964961
}
965962
/* TODO: encapsulate in a request and make the release non-blocking */
966963
if (NULL != free_ptr) {
@@ -987,6 +984,7 @@ inline int ompi_osc_ucx_nonblocking_ops_finalize(ompi_osc_ucx_module_t *module,
987984
ucx_req->acc.free_ptr = free_ptr;
988985
ucx_req->acc.phase = ACC_FINALIZE;
989986
ucx_req->acc.acc_type = ANY;
987+
ucx_req->module = module;
990988

991989
/* Fence any still active operations */
992990
ret = opal_common_ucx_wpmem_fence(module->mem);
@@ -996,7 +994,7 @@ inline int ompi_osc_ucx_nonblocking_ops_finalize(ompi_osc_ucx_module_t *module,
996994
}
997995

998996
if (lock_acquired) {
999-
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS();
997+
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(module);
1000998
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem,
1001999
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
10021000
target, &(module->req_result), sizeof(module->req_result),
@@ -1009,7 +1007,7 @@ inline int ompi_osc_ucx_nonblocking_ops_finalize(ompi_osc_ucx_module_t *module,
10091007
} else {
10101008
/* Lock is not acquired, but still, we need to know when the
10111009
* acc is finalized so that we can free the temp buffers */
1012-
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS();
1010+
OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS(module);
10131011
ret = opal_common_ucx_wpmem_flush_ep_nb(module->mem, target, req_completion, ucx_req, ep);
10141012

10151013
if (ret != OMPI_SUCCESS) {
@@ -1145,8 +1143,7 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
11451143
}
11461144
OBJ_DESTRUCT(&module->pending_posts);
11471145

1148-
opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER,
1149-
&mca_osc_ucx_component.num_incomplete_req_ops, 0);
1146+
opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
11501147

11511148
ret = module->comm->c_coll->coll_barrier(module->comm,
11521149
module->comm->c_coll->coll_barrier_module);

ompi/mca/osc/ucx/osc_ucx_passive_target.c

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
163163
return OMPI_ERR_RMA_SYNC;
164164
}
165165

166-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER,
167-
&mca_osc_ucx_component.num_incomplete_req_ops, 0);
166+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
168167
if (ret != OMPI_SUCCESS) {
169168
return ret;
170169
}
@@ -237,8 +236,7 @@ int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
237236

238237
assert(module->lock_count == 0);
239238

240-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER,
241-
&mca_osc_ucx_component.num_incomplete_req_ops, 0);
239+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
242240
if (ret != OMPI_SUCCESS) {
243241
return ret;
244242
}
@@ -283,8 +281,7 @@ int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
283281
return OMPI_ERR_RMA_SYNC;
284282
}
285283

286-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP,
287-
&mca_osc_ucx_component.num_incomplete_req_ops, target);
284+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
288285
if (ret != OMPI_SUCCESS) {
289286
return ret;
290287
}
@@ -301,8 +298,7 @@ int ompi_osc_ucx_flush_all(struct ompi_win_t *win) {
301298
return OMPI_ERR_RMA_SYNC;
302299
}
303300

304-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER,
305-
&mca_osc_ucx_component.num_incomplete_req_ops, 0);
301+
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
306302
if (ret != OMPI_SUCCESS) {
307303
return ret;
308304
}

ompi/mca/osc/ucx/osc_ucx_request.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ typedef struct ompi_osc_ucx_accumulate_request {
3737
struct ompi_op_t *op;
3838
int phase;
3939
bool lock_acquired;
40-
ompi_osc_ucx_module_t *module;
4140
int target;
4241
struct ompi_win_t *win;
4342
const void *origin_addr;
@@ -55,6 +54,7 @@ typedef struct ompi_osc_ucx_accumulate_request {
5554
typedef struct ompi_osc_ucx_request {
5655
ompi_request_t super;
5756
ompi_osc_ucx_accumulate_request_t acc;
57+
ompi_osc_ucx_module_t *module;
5858
} ompi_osc_ucx_request_t;
5959

6060
OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t);
@@ -65,7 +65,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t);
6565
do { \
6666
item = opal_free_list_get(&mca_osc_ucx_component.requests); \
6767
if (item == NULL) { \
68-
if (mca_osc_ucx_component.num_incomplete_req_ops > 0) { \
68+
if (module->ctx->num_incomplete_req_ops > 0) { \
6969
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool); \
7070
} \
7171
} \
@@ -76,10 +76,10 @@ OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t);
7676
req->super.req_complete = false; \
7777
req->super.req_state = OMPI_REQUEST_ACTIVE; \
7878
req->super.req_status.MPI_ERROR = MPI_SUCCESS; \
79+
req->module = NULL; \
7980
req->acc.op = MPI_NO_OP; \
8081
req->acc.phase = ACC_INIT; \
81-
req->acc.acc_type = NONE; \
82-
req->acc.module = NULL; \
82+
req->acc.acc_type = NONE; \
8383
req->acc.target = -1; \
8484
req->acc.lock_acquired = false; \
8585
req->acc.win = NULL; \

0 commit comments

Comments
 (0)