@@ -32,6 +32,8 @@ __thread int initialized = 0;
3232#endif
3333
3434bool opal_common_ucx_thread_enabled = false;
35+ opal_atomic_int64_t opal_common_ucx_ep_counts = 0 ;
36+ opal_atomic_int64_t opal_common_ucx_unpacked_rkey_counts = 0 ;
3537
3638static _ctx_record_t * _tlocal_add_ctx_rec (opal_common_ucx_ctx_t * ctx );
3739static inline _ctx_record_t * _tlocal_get_ctx_rec (opal_tsd_tracked_key_t tls_key );
@@ -102,6 +104,7 @@ static void _winfo_destructor(opal_common_ucx_winfo_t *winfo)
102104 for (i = 0 ; i < winfo -> comm_size ; i ++ ) {
103105 if (NULL != winfo -> endpoints [i ]) {
104106 ucp_ep_destroy (winfo -> endpoints [i ]);
107+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_ep_counts , -1 );
105108 }
106109 assert (winfo -> inflight_ops [i ] == 0 );
107110 }
@@ -326,9 +329,26 @@ static opal_common_ucx_winfo_t *_wpool_get_winfo(opal_common_ucx_wpool_t *wpool,
326329 return winfo ;
327330}
328331
332+ /* Remove the winfo from active workers and add it to idle workers */
329333static void _wpool_put_winfo (opal_common_ucx_wpool_t * wpool , opal_common_ucx_winfo_t * winfo )
330334{
331335 opal_mutex_lock (& wpool -> mutex );
336+ if (winfo -> comm_size != 0 ) {
337+ size_t i ;
338+ if (opal_common_ucx_thread_enabled ) {
339+ for (i = 0 ; i < winfo -> comm_size ; i ++ ) {
340+ if (NULL != winfo -> endpoints [i ]) {
341+ ucp_ep_destroy (winfo -> endpoints [i ]);
342+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_ep_counts , -1 );
343+ }
344+ assert (winfo -> inflight_ops [i ] == 0 );
345+ }
346+ }
347+ free (winfo -> endpoints );
348+ free (winfo -> inflight_ops );
349+ }
350+ winfo -> endpoints = NULL ;
351+ winfo -> comm_size = 0 ;
332352 opal_list_remove_item (& wpool -> active_workers , & winfo -> super );
333353 opal_list_prepend (& wpool -> idle_workers , & winfo -> super );
334354 opal_mutex_unlock (& wpool -> mutex );
@@ -634,6 +654,7 @@ static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
634654 memset (& ep_params , 0 , sizeof (ucp_ep_params_t ));
635655 ep_params .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS ;
636656
657+ assert (winfo -> endpoints [target ] == NULL );
637658 opal_mutex_lock (& winfo -> mutex );
638659 displ = gctx -> recv_worker_displs [target ];
639660 ep_params .address = (ucp_address_t * ) & (gctx -> recv_worker_addrs [displ ]);
@@ -643,7 +664,9 @@ static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
643664 opal_mutex_unlock (& winfo -> mutex );
644665 return OPAL_ERROR ;
645666 }
667+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_ep_counts , 1 );
646668 opal_mutex_unlock (& winfo -> mutex );
669+ assert (winfo -> endpoints [target ] != NULL );
647670 return OPAL_SUCCESS ;
648671}
649672
@@ -664,6 +687,7 @@ static void _tlocal_mem_rec_cleanup(_mem_record_t *mem_rec)
664687 for (i = 0 ; i < mem_rec -> gmem -> ctx -> comm_size ; i ++ ) {
665688 if (mem_rec -> rkeys [i ]) {
666689 ucp_rkey_destroy (mem_rec -> rkeys [i ]);
690+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_unpacked_rkey_counts , -1 );
667691 }
668692 }
669693 opal_mutex_unlock (& mem_rec -> winfo -> mutex );
@@ -703,6 +727,7 @@ static int _tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int targ
703727
704728 opal_mutex_lock (& mem_rec -> winfo -> mutex );
705729 status = ucp_ep_rkey_unpack (ep , & gmem -> mem_addrs [displ ], & mem_rec -> rkeys [target ]);
730+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_unpacked_rkey_counts , 1 );
706731 opal_mutex_unlock (& mem_rec -> winfo -> mutex );
707732 if (status != UCS_OK ) {
708733 MCA_COMMON_UCX_VERBOSE (1 , "ucp_ep_rkey_unpack failed: %d" , status );
0 commit comments