Skip to content

Commit 1e80628

Browse files
Mamzi Bayatpour  mbayatpour@nvidia.com ()janjust
andcommitted
OSC/UCX: Adding the following optimzations: 1) Reuse the same worker/eps in
single threaded applications, this is helpful if an application creates many windows, therefore, we avoid the unnecessary overheads and 2) adding the truely nonblocking MPI_Accumulate/Get_Accumulate. Signed-off-by: Mamzi Bayatpour <[email protected]> Co-authored-by: Tomislav Janjusic <[email protected]>
1 parent 8fbfe4c commit 1e80628

File tree

10 files changed

+665
-102
lines changed

10 files changed

+665
-102
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#define OMPI_OSC_UCX_ATTACH_MAX 48
2828
#define OMPI_OSC_UCX_MEM_ADDR_MAX_LEN 1024
2929

30+
3031
typedef struct ompi_osc_ucx_component {
3132
ompi_osc_base_component_t super;
3233
opal_common_ucx_wpool_t *wpool;
@@ -125,6 +126,7 @@ typedef struct ompi_osc_ucx_module {
125126
opal_common_ucx_wpmem_t *mem;
126127
opal_common_ucx_wpmem_t *state_mem;
127128

129+
bool skip_sync_check;
128130
bool noncontig_shared_win;
129131
size_t *sizes;
130132
/* in shared windows, shmem_addrs can be used for direct load store to
@@ -150,6 +152,17 @@ typedef struct ompi_osc_ucx_lock {
150152
#define OSC_UCX_GET_EP(comm_, rank_) (ompi_comm_peer_lookup(comm_, rank_)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX])
151153
#define OSC_UCX_GET_DISP(module_, rank_) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit)
152154

155+
extern bool mpi_thread_multiple_enabled;
156+
157+
#define OSC_UCX_GET_DEFAULT_EP(_ep_ptr, _comm, _target) \
158+
if (mpi_thread_multiple_enabled) { \
159+
_ep_ptr = NULL; \
160+
} else { \
161+
_ep_ptr = (ucp_ep_h *)&(OSC_UCX_GET_EP(_comm, _target)); \
162+
}
163+
164+
#define OSC_UCX_OUTSTANDING_OPS_FLUSH_THRESHOLD 64
165+
153166
int ompi_osc_ucx_shared_query(struct ompi_win_t *win, int rank, size_t *size,
154167
int *disp_unit, void * baseptr);
155168
int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len);
@@ -169,6 +182,11 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
169182
int target, ptrdiff_t target_disp, int target_count,
170183
struct ompi_datatype_t *target_dt,
171184
struct ompi_op_t *op, struct ompi_win_t *win);
185+
int ompi_osc_ucx_accumulate_nb(const void *origin_addr, int origin_count,
186+
struct ompi_datatype_t *origin_dt,
187+
int target, ptrdiff_t target_disp, int target_count,
188+
struct ompi_datatype_t *target_dt,
189+
struct ompi_op_t *op, struct ompi_win_t *win);
172190
int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
173191
void *result_addr, struct ompi_datatype_t *dt,
174192
int target, ptrdiff_t target_disp,
@@ -184,6 +202,13 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
184202
int target_rank, ptrdiff_t target_disp,
185203
int target_count, struct ompi_datatype_t *target_datatype,
186204
struct ompi_op_t *op, struct ompi_win_t *win);
205+
int ompi_osc_ucx_get_accumulate_nb(const void *origin_addr, int origin_count,
206+
struct ompi_datatype_t *origin_datatype,
207+
void *result_addr, int result_count,
208+
struct ompi_datatype_t *result_datatype,
209+
int target_rank, ptrdiff_t target_disp,
210+
int target_count, struct ompi_datatype_t *target_datatype,
211+
struct ompi_op_t *op, struct ompi_win_t *win);
187212
int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
188213
struct ompi_datatype_t *origin_dt,
189214
int target, ptrdiff_t target_disp, int target_count,
@@ -229,9 +254,11 @@ int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_
229254
int min_index, int max_index,
230255
uint64_t base, size_t len, int *insert);
231256
extern inline bool ompi_osc_need_acc_lock(ompi_osc_ucx_module_t *module, int target);
232-
extern inline int ompi_osc_state_lock(ompi_osc_ucx_module_t *module, int target,
257+
extern inline int ompi_osc_ucx_state_lock(ompi_osc_ucx_module_t *module, int target,
233258
bool *lock_acquired, bool force_lock);
234-
extern inline int ompi_osc_state_unlock(ompi_osc_ucx_module_t *module, int target,
259+
extern inline int ompi_osc_ucx_state_unlock(ompi_osc_ucx_module_t *module, int target,
235260
bool lock_acquired, void *free_ptr);
261+
extern inline int ompi_osc_ucx_state_unlock_nb(ompi_osc_ucx_module_t *module, int target,
262+
bool lock_acquired, struct ompi_win_t *win);
236263

237264
#endif /* OMPI_OSC_UCX_H */

ompi/mca/osc/ucx/osc_ucx_active_target.c

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,25 +165,28 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
165165
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
166166
int i, size;
167167
int ret = OMPI_SUCCESS;
168+
ucp_ep_h *ep;
168169

169170
if (module->epoch_type.access != START_COMPLETE_EPOCH) {
170171
return OMPI_ERR_RMA_SYNC;
171172
}
172173

173-
module->epoch_type.access = NONE_EPOCH;
174-
175174
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_WORKER, 0/*ignore*/);
176175
if (ret != OMPI_SUCCESS) {
177176
return ret;
178177
}
179178

179+
module->epoch_type.access = NONE_EPOCH;
180+
180181
size = ompi_group_size(module->start_group);
181182
for (i = 0; i < size; i++) {
182183
uint64_t remote_addr = module->state_addrs[module->start_grp_ranks[i]] + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET; // write to state.complete_count on remote side
183184

185+
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, module->start_grp_ranks[i]);
186+
184187
ret = opal_common_ucx_wpmem_post(module->state_mem, UCP_ATOMIC_POST_OP_ADD,
185188
1, module->start_grp_ranks[i], sizeof(uint64_t),
186-
remote_addr);
189+
remote_addr, ep);
187190
if (ret != OMPI_SUCCESS) {
188191
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_post failed: %d", ret);
189192
}
@@ -204,6 +207,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
204207

205208
int ompi_osc_ucx_post(struct ompi_group_t *group, int mpi_assert, struct ompi_win_t *win) {
206209
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
210+
ucp_ep_h *ep;
207211
int ret = OMPI_SUCCESS;
208212

209213
if (module->epoch_type.exposure != NONE_EPOCH) {
@@ -243,12 +247,12 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int mpi_assert, struct ompi_wi
243247
uint64_t remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_INDEX_OFFSET; // write to state.post_index on remote side
244248
uint64_t curr_idx = 0, result = 0;
245249

246-
250+
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, ranks_in_win_grp[i]);
247251

248252
/* do fop first to get an post index */
249253
ret = opal_common_ucx_wpmem_fetch(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
250254
1, ranks_in_win_grp[i], &result,
251-
sizeof(result), remote_addr);
255+
sizeof(result), remote_addr, ep);
252256

253257
if (ret != OMPI_SUCCESS) {
254258
ret = OMPI_ERROR;
@@ -265,7 +269,7 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int mpi_assert, struct ompi_wi
265269
result = myrank + 1;
266270
ret = opal_common_ucx_wpmem_cmpswp(module->state_mem, 0, result,
267271
ranks_in_win_grp[i], &result, sizeof(result),
268-
remote_addr);
272+
remote_addr, ep);
269273

270274
if (ret != OMPI_SUCCESS) {
271275
ret = OMPI_ERROR;

0 commit comments

Comments
 (0)