Skip to content

Commit 688a0bb

Browse files
committed
oshmem: Add symmetric remote key storage structure
After unpacking, we try to find an already existing equivalent remote key. When found, we destroy the new one and use the old one instead. Signed-off-by: Thomas Vegas <[email protected]>
1 parent 436adbc commit 688a0bb

File tree

4 files changed

+211
-20
lines changed

4 files changed

+211
-20
lines changed

config/ompi_check_ucx.m4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ AC_DEFUN([OMPI_CHECK_UCX],[
124124
[#include <ucp/api/ucp.h>])
125125
AC_CHECK_DECLS([ucp_tag_send_nbx,
126126
ucp_tag_send_sync_nbx,
127-
ucp_tag_recv_nbx],
127+
ucp_tag_recv_nbx,
128+
ucp_rkey_compare],
128129
[], [],
129130
[#include <ucp/api/ucp.h>])
130131
AC_CHECK_TYPES([ucp_request_param_t],

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,166 @@ mca_spml_ucx_mem_map_flags_symmetric_rkey(struct mca_spml_ucx *spml_ucx)
138138
return 0;
139139
}
140140

141+
void mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store)
142+
{
143+
store->array = NULL;
144+
store->count = 0;
145+
store->size = 0;
146+
}
147+
148+
void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store)
149+
{
150+
int i;
151+
152+
for (i = 0; i < store->count; i++) {
153+
if (store->array[i].refcnt != 0) {
154+
SPML_UCX_ERROR("rkey store destroy: %d/%d has refcnt %d > 0",
155+
i, store->count, store->array[i].refcnt);
156+
}
157+
158+
ucp_rkey_destroy(store->array[i].rkey);
159+
}
160+
161+
free(store->array);
162+
}
163+
164+
/**
165+
* Find position in sorted array for existing or future entry
166+
*
167+
* @param[in] store Store of the entries
168+
* @param[in] worker Common worker for rkeys used
169+
* @param[in] rkey Remote key to search for
170+
* @param[out] index Index of entry
171+
*
172+
* @return
173+
* OSHMEM_ERR_NOT_FOUND: index contains the position where future element
174+
* should be inserted to keep array sorted
175+
* OSHMEM_SUCCESS : index contains the position of the element
176+
* Other error : index is not valid
177+
*/
178+
static int mca_spml_ucx_rkey_store_find(const mca_spml_ucx_rkey_store_t *store,
179+
const ucp_worker_h worker,
180+
const ucp_rkey_h rkey,
181+
int *index)
182+
{
183+
#if HAVE_DECL_UCP_RKEY_COMPARE
184+
ucp_rkey_compare_params_t params;
185+
int i, result, m, end;
186+
ucs_status_t status;
187+
188+
for (i = 0, end = store->count; i < end;) {
189+
m = (i + end) / 2;
190+
191+
params.field_mask = 0;
192+
status = ucp_rkey_compare(worker, store->array[m].rkey,
193+
rkey, &params, &result);
194+
if (status != UCS_OK) {
195+
return OSHMEM_ERROR;
196+
} else if (result == 0) {
197+
*index = m;
198+
return OSHMEM_SUCCESS;
199+
} else if (result > 0) {
200+
end = m;
201+
} else {
202+
i = m + 1;
203+
}
204+
}
205+
206+
*index = i;
207+
return OSHMEM_ERR_NOT_FOUND;
208+
#else
209+
return OSHMEM_ERROR;
210+
#endif
211+
}
212+
213+
static void mca_spml_ucx_rkey_store_insert(mca_spml_ucx_rkey_store_t *store,
214+
int i, ucp_rkey_h rkey)
215+
{
216+
int size;
217+
mca_spml_ucx_rkey_t *tmp;
218+
219+
if (store->count >= mca_spml_ucx.symmetric_rkey_max_count) {
220+
return;
221+
}
222+
223+
if (store->count >= store->size) {
224+
size = store->size * 2;
225+
if (size == 0) {
226+
size = 16;
227+
}
228+
229+
if (size > mca_spml_ucx.symmetric_rkey_max_count) {
230+
size = mca_spml_ucx.symmetric_rkey_max_count;
231+
}
232+
233+
tmp = realloc(store->array, size * sizeof(*store->array));
234+
if (tmp == NULL) {
235+
return;
236+
}
237+
238+
store->array = tmp;
239+
store->size = size;
240+
}
241+
242+
memmove(&store->array[i + 1], &store->array[i],
243+
(store->count - i) * sizeof(*store->array));
244+
store->array[i].rkey = rkey;
245+
store->array[i].refcnt = 1;
246+
store->count++;
247+
return;
248+
}
249+
250+
/* Takes ownership of input ucp remote key */
251+
static ucp_rkey_h mca_spml_ucx_rkey_store_get(mca_spml_ucx_rkey_store_t *store,
252+
ucp_worker_h worker,
253+
ucp_rkey_h rkey)
254+
{
255+
int ret, i;
256+
257+
if (mca_spml_ucx.symmetric_rkey_max_count < 1) {
258+
return rkey;
259+
}
260+
261+
ret = mca_spml_ucx_rkey_store_find(store, worker, rkey, &i);
262+
if (ret == OSHMEM_SUCCESS) {
263+
ucp_rkey_destroy(rkey);
264+
store->array[i].refcnt++;
265+
return store->array[i].rkey;
266+
}
267+
268+
if (ret == OSHMEM_ERR_NOT_FOUND) {
269+
mca_spml_ucx_rkey_store_insert(store, i, rkey);
270+
}
271+
272+
return rkey;
273+
}
274+
275+
static void mca_spml_ucx_rkey_store_put(mca_spml_ucx_rkey_store_t *store,
276+
ucp_worker_h worker,
277+
ucp_rkey_h rkey)
278+
{
279+
mca_spml_ucx_rkey_t *entry;
280+
int ret, i;
281+
282+
ret = mca_spml_ucx_rkey_store_find(store, worker, rkey, &i);
283+
if (ret != OSHMEM_SUCCESS) {
284+
goto out;
285+
}
286+
287+
entry = &store->array[i];
288+
assert(entry->rkey == rkey);
289+
if (--entry->refcnt > 0) {
290+
return;
291+
}
292+
293+
memmove(&store->array[i], &store->array[i + 1],
294+
(store->count - (i + 1)) * sizeof(*store->array));
295+
store->count--;
296+
297+
out:
298+
ucp_rkey_destroy(rkey);
299+
}
300+
141301
int mca_spml_ucx_enable(bool enable)
142302
{
143303
SPML_UCX_VERBOSE(50, "*** ucx ENABLED ****");
@@ -930,6 +1090,8 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
9301090
}
9311091
}
9321092

1093+
mca_spml_ucx_rkey_store_create(&ucx_ctx->rkey_store);
1094+
9331095
*ucx_ctx_p = ucx_ctx;
9341096

9351097
return OSHMEM_SUCCESS;

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,31 @@ struct ucp_peer {
7676
size_t mkeys_cnt;
7777
};
7878
typedef struct ucp_peer ucp_peer_t;
79-
79+
80+
/* An rkey_store entry */
81+
typedef struct mca_spml_ucx_rkey {
82+
ucp_rkey_h rkey;
83+
int refcnt;
84+
} mca_spml_ucx_rkey_t;
85+
86+
typedef struct mca_spml_ucx_rkey_store {
87+
mca_spml_ucx_rkey_t *array;
88+
int size;
89+
int count;
90+
} mca_spml_ucx_rkey_store_t;
91+
8092
struct mca_spml_ucx_ctx {
81-
ucp_worker_h *ucp_worker;
82-
ucp_peer_t *ucp_peers;
83-
long options;
84-
opal_bitmap_t put_op_bitmap;
85-
unsigned long nb_progress_cnt;
86-
unsigned int ucp_workers;
87-
int *put_proc_indexes;
88-
unsigned put_proc_count;
89-
bool synchronized_quiet;
90-
int strong_sync;
93+
ucp_worker_h *ucp_worker;
94+
ucp_peer_t *ucp_peers;
95+
long options;
96+
opal_bitmap_t put_op_bitmap;
97+
unsigned long nb_progress_cnt;
98+
unsigned int ucp_workers;
99+
int *put_proc_indexes;
100+
unsigned put_proc_count;
101+
bool synchronized_quiet;
102+
int strong_sync;
103+
mca_spml_ucx_rkey_store_t rkey_store;
91104
};
92105
typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
93106

@@ -284,6 +297,9 @@ extern int mca_spml_ucx_team_reduce(shmem_team_t team, void
284297
extern unsigned
285298
mca_spml_ucx_mem_map_flags_symmetric_rkey(struct mca_spml_ucx *spml_ucx);
286299

300+
extern void mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store);
301+
extern void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store);
302+
287303
static inline int
288304
mca_spml_ucx_peer_mkey_get(ucp_peer_t *ucp_peer, int index, spml_ucx_cached_mkey_t **out_rmkey)
289305
{

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ static int mca_spml_ucx_component_close(void)
260260

261261
static int spml_ucx_init(void)
262262
{
263-
unsigned int i;
263+
unsigned int i, ret;
264264
ucs_status_t err;
265265
ucp_config_t *ucp_config;
266266
ucp_params_t params;
@@ -336,6 +336,8 @@ static int spml_ucx_init(void)
336336
mca_spml_ucx_ctx_default.ucp_workers++;
337337
}
338338

339+
mca_spml_ucx_rkey_store_create(&mca_spml_ucx_ctx_default.rkey_store);
340+
339341
wrk_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE;
340342
err = ucp_worker_query(mca_spml_ucx_ctx_default.ucp_worker[0], &wrk_attr);
341343

@@ -444,6 +446,7 @@ static int mca_spml_ucx_component_fini(void)
444446
{
445447
int fenced = 0, i;
446448
int ret = OSHMEM_SUCCESS;
449+
mca_spml_ucx_ctx_t *ctx;
447450

448451
opal_progress_unregister(spml_ucx_default_progress);
449452
if (mca_spml_ucx.active_array.ctxs_count) {
@@ -496,27 +499,36 @@ static int mca_spml_ucx_component_fini(void)
496499
}
497500
}
498501

499-
/* delete all workers */
502+
/* delete all workers and rkey_stores */
500503
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
501-
ucp_worker_destroy(mca_spml_ucx.active_array.ctxs[i]->ucp_worker[0]);
502-
free(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
503-
free(mca_spml_ucx.active_array.ctxs[i]);
504+
ctx = mca_spml_ucx.active_array.ctxs[i];
505+
506+
mca_spml_ucx_rkey_store_destroy(&ctx->rkey_store);
507+
ucp_worker_destroy(ctx->ucp_worker[0]);
508+
free(ctx->ucp_worker);
509+
free(ctx);
504510
}
505511

506512
for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
507-
ucp_worker_destroy(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker[0]);
508-
free(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
509-
free(mca_spml_ucx.idle_array.ctxs[i]);
513+
ctx = mca_spml_ucx.idle_array.ctxs[i];
514+
515+
mca_spml_ucx_rkey_store_destroy(&ctx->rkey_store);
516+
ucp_worker_destroy(ctx->ucp_worker[0]);
517+
free(ctx->ucp_worker);
518+
free(ctx);
510519
}
511520

512521
if (mca_spml_ucx_ctx_default.ucp_worker) {
522+
mca_spml_ucx_rkey_store_destroy(&mca_spml_ucx_ctx_default.rkey_store);
523+
513524
for (i = 0; i < (signed int)mca_spml_ucx.ucp_workers; i++) {
514525
ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker[i]);
515526
}
516527
free(mca_spml_ucx_ctx_default.ucp_worker);
517528
}
518529

519530
if (mca_spml_ucx.aux_ctx != NULL) {
531+
mca_spml_ucx_rkey_store_destroy(&mca_spml_ucx.aux_ctx->rkey_store);
520532
ucp_worker_destroy(mca_spml_ucx.aux_ctx->ucp_worker[0]);
521533
free(mca_spml_ucx.aux_ctx->ucp_worker);
522534
}

0 commit comments

Comments
 (0)