@@ -40,6 +40,8 @@ typedef struct ucx_iovec {
4040 size_t len ;
4141} ucx_iovec_t ;
4242
43+ int outstanding_ops_flush_threshold = 64 ;
44+
4345static inline int check_sync_state (ompi_osc_ucx_module_t * module , int target ,
4446 bool is_req_ops ) {
4547
@@ -245,12 +247,6 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
245247 }
246248
247249cleanup :
248- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
249- if (ret != OPAL_SUCCESS ) {
250- ret = OMPI_ERROR ;
251- goto cleanup ;
252- }
253-
254250 if (origin_ucx_iov != NULL ) {
255251 free (origin_ucx_iov );
256252 }
@@ -294,7 +290,8 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
294290 goto cleanup ;
295291 }
296292
297- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
293+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP ,
294+ & mca_osc_ucx_component .num_incomplete_req_ops , target );
298295 if (ret != OPAL_SUCCESS ) {
299296 ret = OMPI_ERROR ;
300297 goto cleanup ;
@@ -674,7 +671,8 @@ int accumulate_req(const void *origin_addr, int origin_count,
674671 return ret ;
675672 }
676673
677- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
674+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP ,
675+ & mca_osc_ucx_component .num_incomplete_req_ops , target );
678676 if (ret != OMPI_SUCCESS ) {
679677 free (temp_addr );
680678 return ret ;
@@ -734,7 +732,8 @@ int accumulate_req(const void *origin_addr, int origin_count,
734732
735733 }
736734
737- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
735+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP ,
736+ & mca_osc_ucx_component .num_incomplete_req_ops , target );
738737 if (ret != OPAL_SUCCESS ) {
739738 return ret ;
740739 }
@@ -847,7 +846,8 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
847846 return OMPI_ERROR ;
848847 }
849848
850- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
849+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP ,
850+ & mca_osc_ucx_component .num_incomplete_req_ops , target );
851851 if (ret != OPAL_SUCCESS ) {
852852 return ret ;
853853 }
@@ -970,7 +970,8 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
970970
971971 if (op != & ompi_mpi_op_no_op .op ) {
972972 if (op == & ompi_mpi_op_replace .op ) {
973- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
973+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP ,
974+ & mca_osc_ucx_component .num_incomplete_req_ops , target );
974975 if (ret != OMPI_SUCCESS ) {
975976 return ret ;
976977 }
@@ -1010,7 +1011,8 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10101011 return ret ;
10111012 }
10121013
1013- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
1014+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP ,
1015+ & mca_osc_ucx_component .num_incomplete_req_ops , target );
10141016 if (ret != OMPI_SUCCESS ) {
10151017 return ret ;
10161018 }
@@ -1066,7 +1068,8 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10661068 }
10671069 }
10681070
1069- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
1071+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP ,
1072+ & mca_osc_ucx_component .num_incomplete_req_ops , target );
10701073 if (ret != OPAL_SUCCESS ) {
10711074 return ret ;
10721075 }
@@ -1134,7 +1137,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11341137
11351138 OMPI_OSC_UCX_REQUEST_ALLOC (win , ucx_req );
11361139
1137- mca_osc_ucx_component . num_incomplete_req_ops ++ ;
1140+ INCREMENT_OUTSTANDING_NB_OPS ;
11381141 ret = opal_common_ucx_wpmem_flush_ep_nb (mem , target , req_completion , ucx_req , ep );
11391142
11401143 if (ret != OMPI_SUCCESS ) {
@@ -1189,7 +1192,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11891192
11901193 OMPI_OSC_UCX_REQUEST_ALLOC (win , ucx_req );
11911194
1192- mca_osc_ucx_component . num_incomplete_req_ops ++ ;
1195+ INCREMENT_OUTSTANDING_NB_OPS ;
11931196 ret = opal_common_ucx_wpmem_flush_ep_nb (mem , target , req_completion , ucx_req , ep );
11941197
11951198 if (ret != OMPI_SUCCESS ) {
@@ -1308,14 +1311,21 @@ static inline int ompi_osc_ucx_acc_rputget(void *stage_addr, int stage_count,
13081311 ucx_req -> acc .win = win ;
13091312 ucx_req -> acc .origin_addr = origin_addr ;
13101313 ucx_req -> acc .origin_count = origin_count ;
1311- ucx_req -> acc .origin_dt = origin_dt ;
1314+ if (origin_dt != NULL ) {
1315+ MPI_Type_dup (origin_dt , & ucx_req -> acc .origin_dt );
1316+ }
13121317 ucx_req -> acc .stage_addr = stage_addr ;
13131318 ucx_req -> acc .stage_count = stage_count ;
1314- ucx_req -> acc .stage_dt = stage_dt ;
1319+ if (stage_dt != NULL ) {
1320+ MPI_Type_dup (stage_dt , & ucx_req -> acc .stage_dt );
1321+ }
13151322 ucx_req -> acc .target = target ;
1316- ucx_req -> acc .target_dt = target_dt ;
1323+ if (target_dt != NULL ) {
1324+ MPI_Type_dup (target_dt , & ucx_req -> acc .target_dt );
1325+ }
13171326 ucx_req -> acc .target_disp = target_disp ;
13181327 ucx_req -> acc .target_count = target_count ;
1328+ ucx_req -> acc .free_ptr = NULL ;
13191329 }
13201330 sync_check = module -> skip_sync_check ;
13211331 module -> skip_sync_check = true; /* we already hold the acc lock, so no need for sync check*/
@@ -1333,7 +1343,7 @@ static inline int ompi_osc_ucx_acc_rputget(void *stage_addr, int stage_count,
13331343
13341344 module -> skip_sync_check = sync_check ;
13351345 if (acc_type != NONE ) {
1336- mca_osc_ucx_component . num_incomplete_req_ops ++ ;
1346+ INCREMENT_OUTSTANDING_NB_OPS ;
13371347 ret = opal_common_ucx_wpmem_flush_ep_nb (mem , target , req_completion , ucx_req , ep );
13381348
13391349 if (ret != OMPI_SUCCESS ) {
@@ -1398,8 +1408,9 @@ static int ompi_osc_ucx_get_accumulate_nonblocking(const void *origin_addr, int
13981408 return ret ;
13991409 }
14001410
1401- if (mca_osc_ucx_component .num_incomplete_req_ops > OSC_UCX_OUTSTANDING_OPS_FLUSH_THRESHOLD ) {
1402- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_WORKER , 0 );
1411+ if (mca_osc_ucx_component .num_incomplete_req_ops > outstanding_ops_flush_threshold ) {
1412+ ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_WORKER ,
1413+ & mca_osc_ucx_component .num_incomplete_req_ops , 0 );
14031414 if (ret != OPAL_SUCCESS ) {
14041415 ret = OMPI_ERROR ;
14051416 return ret ;
@@ -1470,6 +1481,7 @@ void req_completion(void *request) {
14701481
14711482 if (req -> acc .acc_type != NONE ) {
14721483 assert (req -> acc .phase != ACC_INIT );
1484+ void * free_addr = NULL ;
14731485 bool release_lock = false;
14741486 ptrdiff_t temp_lb , temp_extent ;
14751487 const void * origin_addr = req -> acc .origin_addr ;
@@ -1485,7 +1497,30 @@ void req_completion(void *request) {
14851497 struct ompi_win_t * win = req -> acc .win ;
14861498 struct ompi_op_t * op = req -> acc .op ;
14871499
1500+ if (req -> acc .phase != ACC_FINALIZE ) {
1501+ /* Avoid calling flush while we are already in progress */
1502+ req -> acc .module -> mem -> skip_periodic_flush = true;
1503+ req -> acc .module -> state_mem -> skip_periodic_flush = true;
1504+ }
1505+
14881506 switch (req -> acc .phase ) {
1507+ case ACC_FINALIZE :
1508+ {
1509+ if (req -> acc .free_ptr != NULL ) {
1510+ free (req -> acc .free_ptr );
1511+ req -> acc .free_ptr = NULL ;
1512+ }
1513+ if (origin_dt != NULL ) {
1514+ ompi_datatype_destroy (& origin_dt );
1515+ }
1516+ if (target_dt != NULL ) {
1517+ ompi_datatype_destroy (& target_dt );
1518+ }
1519+ if (temp_dt != NULL ) {
1520+ ompi_datatype_destroy (& temp_dt );
1521+ }
1522+ break ;
1523+ }
14891524 case ACC_GET_RESULTS_DATA :
14901525 {
14911526 /* This is a get-accumulate operation */
@@ -1494,6 +1529,7 @@ void req_completion(void *request) {
14941529 * acc lock and return */
14951530 release_lock = true;
14961531 } else if (op == & ompi_mpi_op_replace .op ) {
1532+ assert (target_dt != NULL && origin_dt != NULL );
14971533 /* Now that we have the results data, replace the target
14981534 * buffer with origin buffer and then release the lock */
14991535 ret = ompi_osc_ucx_acc_rputget (NULL , 0 , NULL , target , target_disp ,
@@ -1520,6 +1556,7 @@ void req_completion(void *request) {
15201556 case ACC_GET_STAGE_DATA :
15211557 {
15221558 assert (op != & ompi_mpi_op_replace .op && op != & ompi_mpi_op_no_op .op );
1559+ assert (origin_dt != NULL && temp_dt != NULL );
15231560
15241561 bool is_origin_contig =
15251562 ompi_datatype_is_contiguous_memory_layout (origin_dt , origin_count );
@@ -1593,6 +1630,7 @@ void req_completion(void *request) {
15931630 abort ();
15941631 }
15951632 release_lock = true;
1633+ free_addr = temp_addr ;
15961634 break ;
15971635 }
15981636
@@ -1605,12 +1643,17 @@ void req_completion(void *request) {
16051643
16061644 if (release_lock ) {
16071645 /* Ordering between previous put/get operations and unlock will be realized
1608- * through the ucp fence inside the state unlock function */
1609- ompi_osc_ucx_state_unlock_nb (req -> acc .module , target , req -> acc .lock_acquired , win );
1646+ * through the ucp fence inside the finalize function */
1647+ ompi_osc_ucx_nonblocking_ops_finalize (req -> acc .module , target , req -> acc .lock_acquired , win , free_addr );
1648+ }
1649+
1650+ if (req -> acc .phase != ACC_FINALIZE ) {
1651+ req -> acc .module -> mem -> skip_periodic_flush = false;
1652+ req -> acc .module -> state_mem -> skip_periodic_flush = false;
16101653 }
16111654 }
16121655
1613- mca_osc_ucx_component . num_incomplete_req_ops -- ;
1656+ DECREMENT_OUTSTANDING_NB_OPS ;
16141657 ompi_request_complete (& (req -> super ), true);
16151658 assert (mca_osc_ucx_component .num_incomplete_req_ops >= 0 );
16161659}
0 commit comments