Skip to content

Commit a415d03

Browse files
committed
oshmem: Add symmetric remote key storage structure
After unpacking, we try to find an already existing equivalent remote key. When found, we destroy the new one and use the old one instead. Signed-off-by: Thomas Vegas <[email protected]>
1 parent 45fefc3 commit a415d03

File tree

3 files changed

+187
-26
lines changed

3 files changed

+187
-26
lines changed

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,128 @@ mca_spml_ucx_mem_map_flags_symmetric_rkey(struct mca_spml_ucx *spml_ucx)
138138
return 0;
139139
}
140140

141+
int mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store,
142+
ucp_worker_h worker,
143+
int size)
144+
{
145+
store->array = calloc(size, sizeof(*store->array));
146+
if (store->array == NULL) {
147+
return OSHMEM_ERR_OUT_OF_RESOURCE;
148+
}
149+
150+
store->size = size;
151+
store->count = 0;
152+
store->worker = worker;
153+
return OSHMEM_SUCCESS;
154+
}
155+
156+
void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store)
157+
{
158+
int i;
159+
160+
for (i = 0; i < store->count; i++) {
161+
assert(store->array[i].refcnt == 0);
162+
ucp_rkey_destroy(store->array[i].rkey);
163+
}
164+
165+
free(store->array);
166+
store->array = NULL;
167+
store->count = 0;
168+
store->size = 0;
169+
}
170+
171+
static int mca_spml_ucx_rkey_store_find(const mca_spml_ucx_rkey_store_t *store,
172+
const ucp_rkey_h target,
173+
int *index)
174+
{
175+
#if HAVE_DECL_UCP_MEM_MAP_SYMMETRIC_RKEY
176+
ucp_rkey_compare_params_t params = {};
177+
int i, end, result, m;
178+
mca_spml_ucx_rkey_t *entry;
179+
ucs_status_t status;
180+
181+
for (i = 0, end = store->count; i < end;) {
182+
m = (i + end) / 2;
183+
entry = &store->array[m];
184+
185+
status = ucp_rkey_compare(store->worker, entry->rkey, target,
186+
&params, &result);
187+
if (status != UCS_OK) {
188+
return OSHMEM_ERROR;
189+
} else if (result == 0) {
190+
*index = m;
191+
return OSHMEM_SUCCESS;
192+
} else if (result > 0) {
193+
end = m;
194+
} else {
195+
i = m + 1;
196+
}
197+
}
198+
199+
*index = i;
200+
return OSHMEM_ERR_NOT_FOUND;
201+
#else
202+
return OSHMEM_ERROR;
203+
#endif
204+
}
205+
206+
/* Takes ownership of input ucp remote key */
207+
static ucp_rkey_h mca_spml_ucx_rkey_store_get(mca_spml_ucx_rkey_store_t *store,
208+
ucp_worker_h worker,
209+
ucp_rkey_h target)
210+
{
211+
int ret, i;
212+
213+
if ((store->size == 0) || (worker != store->worker)) {
214+
return target;
215+
}
216+
217+
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
218+
if ((ret == OSHMEM_ERR_NOT_FOUND) && (store->count < store->size)) {
219+
memmove(&store->array[i + 1], &store->array[i],
220+
(store->count - i) * sizeof(*store->array));
221+
store->array[i].rkey = target;
222+
store->array[i].refcnt = 0;
223+
store->count++;
224+
} else if (ret != OSHMEM_SUCCESS) {
225+
assert((ret == OSHMEM_ERR_NOT_FOUND) || (ret == OSHMEM_ERROR));
226+
return target; // internal error or max capacity reached
227+
} else {
228+
ucp_rkey_destroy(target);
229+
}
230+
231+
store->array[i].refcnt++;
232+
return store->array[i].rkey;
233+
}
234+
235+
static void mca_spml_ucx_rkey_store_put(mca_spml_ucx_rkey_store_t *store,
236+
ucp_rkey_h target)
237+
{
238+
mca_spml_ucx_rkey_t *entry;
239+
int ret, i;
240+
241+
if (store->count == 0) {
242+
ucp_rkey_destroy(target);
243+
return;
244+
}
245+
246+
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
247+
if (ret != OSHMEM_SUCCESS) {
248+
ucp_rkey_destroy(target);
249+
return;
250+
}
251+
252+
entry = &store->array[i];
253+
entry->refcnt--;
254+
if (entry->refcnt <= 0) {
255+
ucp_rkey_destroy(entry->rkey);
256+
257+
memmove(&store->array[i], &store->array[i + 1],
258+
(store->count - (i + 1)) * sizeof(*store->array));
259+
store->count--;
260+
}
261+
}
262+
141263
int mca_spml_ucx_enable(bool enable)
142264
{
143265
SPML_UCX_VERBOSE(50, "*** ucx ENABLED ****");

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,34 +101,48 @@ typedef struct mca_spml_ucx_ctx_array {
101101
mca_spml_ucx_ctx_t **ctxs;
102102
} mca_spml_ucx_ctx_array_t;
103103

104+
typedef struct mca_spml_ucx_rkey {
105+
ucp_rkey_h rkey;
106+
uint16_t refcnt;
107+
} mca_spml_ucx_rkey_t;
108+
109+
typedef struct mca_spml_ucx_rkey_store {
110+
int size;
111+
int count;
112+
mca_spml_ucx_rkey_t *array;
113+
ucp_worker_h worker;
114+
} mca_spml_ucx_rkey_store_t;
115+
104116
struct mca_spml_ucx {
105-
mca_spml_base_module_t super;
106-
ucp_context_h ucp_context;
107-
int num_disconnect;
108-
int heap_reg_nb;
109-
bool enabled;
117+
mca_spml_base_module_t super;
118+
ucp_context_h ucp_context;
119+
int num_disconnect;
120+
int heap_reg_nb;
121+
bool enabled;
110122
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
111-
char ***remote_addrs_tbl;
112-
mca_spml_ucx_ctx_array_t active_array;
113-
mca_spml_ucx_ctx_array_t idle_array;
114-
int priority; /* component priority */
115-
shmem_internal_mutex_t internal_mutex;
116-
pthread_mutex_t ctx_create_mutex;
123+
char ***remote_addrs_tbl;
124+
mca_spml_ucx_ctx_array_t active_array;
125+
mca_spml_ucx_ctx_array_t idle_array;
126+
int priority; /* component priority */
127+
shmem_internal_mutex_t internal_mutex;
128+
pthread_mutex_t ctx_create_mutex;
117129
/* Fields controlling aux context for put_all_nb SPML routine */
118-
bool async_progress;
119-
int async_tick;
120-
opal_event_base_t *async_event_base;
121-
opal_event_t *tick_event;
122-
mca_spml_ucx_ctx_t *aux_ctx;
123-
pthread_spinlock_t async_lock;
124-
int aux_refcnt;
125-
unsigned long nb_progress_thresh_global;
126-
unsigned long nb_put_progress_thresh;
127-
unsigned long nb_get_progress_thresh;
128-
unsigned long nb_ucp_worker_progress;
129-
unsigned int ucp_workers;
130-
unsigned int ucp_worker_cnt;
131-
bool symmetric_rkey;
130+
bool async_progress;
131+
int async_tick;
132+
opal_event_base_t *async_event_base;
133+
opal_event_t *tick_event;
134+
mca_spml_ucx_ctx_t *aux_ctx;
135+
pthread_spinlock_t async_lock;
136+
int aux_refcnt;
137+
unsigned long nb_progress_thresh_global;
138+
unsigned long nb_put_progress_thresh;
139+
unsigned long nb_get_progress_thresh;
140+
unsigned long nb_ucp_worker_progress;
141+
unsigned int ucp_workers;
142+
unsigned int ucp_worker_cnt;
143+
mca_spml_ucx_rkey_store_t rkey_store;
144+
bool symmetric_rkey;
145+
int symmetric_rkey_size;
132146
};
133147
typedef struct mca_spml_ucx mca_spml_ucx_t;
134148

@@ -284,6 +298,10 @@ extern int mca_spml_ucx_team_reduce(shmem_team_t team, void
284298
extern unsigned
285299
mca_spml_ucx_mem_map_flags_symmetric_rkey(struct mca_spml_ucx *spml_ucx);
286300

301+
extern int mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store,
302+
ucp_worker_h worker, int size);
303+
extern void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store);
304+
287305
static inline int
288306
mca_spml_ucx_peer_mkey_get(ucp_peer_t *ucp_peer, int index, spml_ucx_cached_mkey_t **out_rmkey)
289307
{

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ static int mca_spml_ucx_component_register(void)
157157
"Enable remote keys deduplication to save memory at scale (best-effort)",
158158
&mca_spml_ucx.symmetric_rkey);
159159

160+
mca_spml_ucx_param_register_int("symmetric_rkey_size", 1000,
161+
"Size of the symmetric key store. Default (1000)",
162+
&mca_spml_ucx.symmetric_rkey_size);
163+
160164
mca_spml_ucx_param_register_int("async_tick_usec", 3000,
161165
"Asynchronous progress tick granularity (in usec)",
162166
&mca_spml_ucx.async_tick);
@@ -260,7 +264,7 @@ static int mca_spml_ucx_component_close(void)
260264

261265
static int spml_ucx_init(void)
262266
{
263-
unsigned int i;
267+
unsigned int i, ret;
264268
ucs_status_t err;
265269
ucp_config_t *ucp_config;
266270
ucp_params_t params;
@@ -357,6 +361,21 @@ static int spml_ucx_init(void)
357361
-1, EV_PERSIST, mca_spml_ucx_async_cb, NULL);
358362
}
359363

364+
if (mca_spml_ucx.symmetric_rkey) {
365+
if (mca_spml_ucx.symmetric_rkey_size <= 0) {
366+
SPML_UCX_ERROR("failed to create symmetric rkey store with size %d",
367+
mca_spml_ucx.symmetric_rkey_size);
368+
return OSHMEM_ERROR;
369+
}
370+
371+
ret = mca_spml_ucx_rkey_store_create(&mca_spml_ucx.rkey_store,
372+
mca_spml_ucx_ctx_default.ucp_worker[0],
373+
mca_spml_ucx.symmetric_rkey_size);
374+
if (OSHMEM_SUCCESS != ret) {
375+
SPML_UCX_ERROR("failed to initialize symmetric rkey store");
376+
}
377+
}
378+
360379
mca_spml_ucx.aux_ctx = NULL;
361380
mca_spml_ucx.aux_refcnt = 0;
362381

@@ -496,6 +515,8 @@ static int mca_spml_ucx_component_fini(void)
496515
}
497516
}
498517

518+
mca_spml_ucx_rkey_store_destroy(&mca_spml_ucx.rkey_store);
519+
499520
/* delete all workers */
500521
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
501522
ucp_worker_destroy(mca_spml_ucx.active_array.ctxs[i]->ucp_worker[0]);

0 commit comments

Comments
 (0)