Skip to content

Commit a2d2993

Browse files
author
Mamzi Bayatpour [email protected] ()
committed
Using a separate lock for handling the dynamic windows instead of
accumulate lock + some code reorganizations
1 parent 6fe065a commit a2d2993

File tree

3 files changed

+102
-61
lines changed

3 files changed

+102
-61
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ typedef struct ompi_osc_ucx_epoch_type {
8282
#define OSC_UCX_STATE_COMPLETE_COUNT_OFFSET (sizeof(uint64_t) * 3)
8383
#define OSC_UCX_STATE_POST_INDEX_OFFSET (sizeof(uint64_t) * 4)
8484
#define OSC_UCX_STATE_POST_STATE_OFFSET (sizeof(uint64_t) * 5)
85-
#define OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET (sizeof(uint64_t) * (5 + OMPI_OSC_UCX_POST_PEER_MAX))
85+
#define OSC_UCX_STATE_DYNAMIC_LOCK_OFFSET (sizeof(uint64_t) * 6)
86+
#define OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET (sizeof(uint64_t) * (6 + OMPI_OSC_UCX_POST_PEER_MAX))
8687

8788
typedef struct ompi_osc_dynamic_win_info {
8889
uint64_t base;
@@ -267,11 +268,13 @@ int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_
267268
int min_index, int max_index,
268269
uint64_t base, size_t len, int *insert);
269270
extern inline bool ompi_osc_need_acc_lock(ompi_osc_ucx_module_t *module, int target);
270-
extern inline int ompi_osc_ucx_state_lock(ompi_osc_ucx_module_t *module, int target,
271-
bool *lock_acquired, bool force_lock);
272-
extern inline int ompi_osc_ucx_state_unlock(ompi_osc_ucx_module_t *module, int target,
271+
extern inline int ompi_osc_ucx_acc_lock(ompi_osc_ucx_module_t *module, int target,
272+
bool *lock_acquired);
273+
extern inline int ompi_osc_ucx_acc_unlock(ompi_osc_ucx_module_t *module, int target,
273274
bool lock_acquired, void *free_ptr);
274275
extern inline int ompi_osc_ucx_nonblocking_ops_finalize(ompi_osc_ucx_module_t *module, int target,
275276
bool lock_acquired, struct ompi_win_t *win, void *free_ptr);
277+
extern inline int ompi_osc_ucx_dynamic_lock(ompi_osc_ucx_module_t *module, int target);
278+
extern inline int ompi_osc_ucx_dynamic_unlock(ompi_osc_ucx_module_t *module, int target);
276279

277280
#endif /* OMPI_OSC_UCX_H */

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
return OMPI_ERROR; \
2828
}
2929

30-
#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret, _lock_required) \
30+
#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret) \
3131
if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
32-
_ret = get_dynamic_win_info(_remote_addr, _module, _target, _lock_required); \
32+
_ret = get_dynamic_win_info(_remote_addr, _module, _target); \
3333
if (_ret != OMPI_SUCCESS) { \
3434
return _ret; \
3535
} \
@@ -257,8 +257,8 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
257257
return ret;
258258
}
259259

260-
static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module_t *module,
261-
int target, bool lock_required) {
260+
static inline int get_dynamic_win_info(uint64_t remote_addr,
261+
ompi_osc_ucx_module_t *module, int target) {
262262
ucp_ep_h *ep;
263263
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
264264
uint64_t remote_state_addr = (module->state_addrs)[target] + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET;
@@ -270,15 +270,13 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
270270
int insert = -1;
271271
int ret;
272272

273-
bool lock_acquired = false;
274-
if (lock_required) {
275-
/* We need to lock acc-lock even if the process has an exclusive lock.
276-
* Therefore, force lock is needed. Remote process protects its window
277-
* attach/detach operations with an acc-lock */
278-
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, true);
279-
if (ret != OMPI_SUCCESS) {
280-
return ret;
281-
}
273+
/* We need to lock dyn-lock even if the process has an exclusive lock.
274+
* Remote process protects its window attach/detach operations with a
275+
* dynamic lock */
276+
ret = ompi_osc_ucx_dynamic_lock(module, target);
277+
if (ret != OPAL_SUCCESS) {
278+
ret = OMPI_ERROR;
279+
goto cleanup;
282280
}
283281

284282
ret = opal_common_ucx_wpmem_putget(module->state_mem, OPAL_COMMON_UCX_GET, target,
@@ -358,9 +356,8 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
358356
cleanup:
359357
free(temp_buf);
360358

361-
ompi_osc_ucx_state_unlock(module, target, lock_acquired, NULL);
362-
363-
return ret;
359+
/* unlock the dynamic lock */
360+
return ompi_osc_ucx_dynamic_unlock(module, target);
364361
}
365362

366363
static inline
@@ -434,7 +431,7 @@ static int do_atomic_op_intrinsic(
434431

435432
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
436433

437-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
434+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
438435

439436
ucp_atomic_fetch_op_t opcode;
440437
bool is_no_op = false;
@@ -505,7 +502,7 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
505502
return ret;
506503
}
507504

508-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
505+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
509506

510507
if (!target_count) {
511508
return OMPI_SUCCESS;
@@ -557,7 +554,7 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
557554
return ret;
558555
}
559556

560-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
557+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
561558

562559
if (!target_count) {
563560
return OMPI_SUCCESS;
@@ -625,12 +622,12 @@ int accumulate_req(const void *origin_addr, int origin_count,
625622
}
626623

627624
/* Start atomicity by acquiring acc lock */
628-
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, false);
625+
ret = ompi_osc_ucx_acc_lock(module, target, &lock_acquired);
629626
if (ret != OMPI_SUCCESS) {
630627
return ret;
631628
}
632629

633-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
630+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
634631

635632
if (op == &ompi_mpi_op_replace.op) {
636633
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target,
@@ -738,7 +735,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
738735
ompi_request_complete(&ucx_req->super.super, true);
739736
}
740737

741-
return ompi_osc_ucx_state_unlock(module, target, lock_acquired, free_ptr);
738+
return ompi_osc_ucx_acc_unlock(module, target, lock_acquired, free_ptr);
742739
}
743740

744741
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
@@ -776,13 +773,13 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
776773
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
777774
if (!module->acc_single_intrinsic) {
778775
/* Start atomicity by acquiring acc lock */
779-
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, false);
776+
ret = ompi_osc_ucx_acc_lock(module, target, &lock_acquired);
780777
if (ret != OMPI_SUCCESS) {
781778
return ret;
782779
}
783780
}
784781

785-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
782+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
786783

787784
ompi_datatype_type_size(dt, &dt_bytes);
788785
uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes);
@@ -795,7 +792,7 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
795792
return ret;
796793
}
797794

798-
return ompi_osc_ucx_state_unlock(module, target, lock_acquired, NULL);
795+
return ompi_osc_ucx_acc_unlock(module, target, lock_acquired, NULL);
799796
}
800797

801798
int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
@@ -827,12 +824,12 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
827824
/* fall back to get-compare-put */
828825

829826
/* Start atomicity by acquiring acc lock */
830-
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, false);
827+
ret = ompi_osc_ucx_acc_lock(module, target, &lock_acquired);
831828
if (ret != OMPI_SUCCESS) {
832829
return ret;
833830
}
834831

835-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
832+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
836833

837834
ret = opal_common_ucx_wpmem_putget(mem, OPAL_COMMON_UCX_GET, target,
838835
result_addr, dt_bytes, remote_addr, ep);
@@ -856,7 +853,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
856853
}
857854
}
858855

859-
return ompi_osc_ucx_state_unlock(module, target, lock_acquired, NULL);
856+
return ompi_osc_ucx_acc_unlock(module, target, lock_acquired, NULL);
860857
}
861858

862859
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
@@ -885,13 +882,13 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
885882

886883
if (!module->acc_single_intrinsic) {
887884
/* Start atomicity by acquiring acc lock */
888-
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, false);
885+
ret = ompi_osc_ucx_acc_lock(module, target, &lock_acquired);
889886
if (ret != OMPI_SUCCESS) {
890887
return ret;
891888
}
892889
}
893890

894-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
891+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
895892

896893
value = origin_addr ? opal_common_ucx_load_uint64(origin_addr, dt_bytes) : 0;
897894

@@ -912,7 +909,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
912909
return ret;
913910
}
914911

915-
return ompi_osc_ucx_state_unlock(module, target, lock_acquired, NULL);
912+
return ompi_osc_ucx_acc_unlock(module, target, lock_acquired, NULL);
916913
} else {
917914
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
918915
target, target_disp, 1, dt, op, win);
@@ -949,12 +946,12 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
949946
}
950947

951948
/* Start atomicity by acquiring acc lock */
952-
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, false);
949+
ret = ompi_osc_ucx_acc_lock(module, target, &lock_acquired);
953950
if (ret != OMPI_SUCCESS) {
954951
return ret;
955952
}
956953

957-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
954+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
958955

959956
ret = ompi_osc_ucx_get(result_addr, result_count, result_dt, target,
960957
target_disp, target_count, target_dt, win);
@@ -1071,7 +1068,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10711068
}
10721069

10731070

1074-
return ompi_osc_ucx_state_unlock(module, target, lock_acquired, free_addr);
1071+
return ompi_osc_ucx_acc_unlock(module, target, lock_acquired, free_addr);
10751072
}
10761073

10771074
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
@@ -1118,7 +1115,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11181115
return ret;
11191116
}
11201117

1121-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
1118+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
11221119

11231120
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target, target_disp,
11241121
target_count, target_dt, win);
@@ -1174,7 +1171,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11741171
return ret;
11751172
}
11761173

1177-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
1174+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
11781175

11791176
ret = ompi_osc_ucx_get(origin_addr, origin_count, origin_dt, target, target_disp,
11801177
target_count, target_dt, win);
@@ -1291,7 +1288,7 @@ static inline int ompi_osc_ucx_acc_rputget(void *stage_addr, int stage_count,
12911288
ompi_osc_ucx_accumulate_request_t *ucx_req = NULL;
12921289
bool sync_check;
12931290
int ret = OMPI_SUCCESS;
1294-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
1291+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
12951292

12961293
if (acc_type != NONE) {
12971294
OMPI_OSC_UCX_ACCUMULATE_REQUEST_ALLOC(win, ucx_req);
@@ -1406,7 +1403,7 @@ static int ompi_osc_ucx_get_accumulate_nonblocking(const void *origin_addr, int
14061403
}
14071404

14081405
/* Start atomicity by acquiring acc lock */
1409-
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, false);
1406+
ret = ompi_osc_ucx_acc_lock(module, target, &lock_acquired);
14101407
if (ret != OMPI_SUCCESS) {
14111408
return ret;
14121409
}
@@ -1419,7 +1416,7 @@ static int ompi_osc_ucx_get_accumulate_nonblocking(const void *origin_addr, int
14191416
}
14201417
}
14211418

1422-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
1419+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
14231420

14241421
if (result_addr != NULL) {
14251422
/* This is a get-accumulate operation, so read the target data into result addr */

0 commit comments

Comments
 (0)