Skip to content

Commit f287afb

Browse files
committed
oshmem: Add storage structure
1 parent 3d3faba commit f287afb

File tree

3 files changed

+169
-25
lines changed

3 files changed

+169
-25
lines changed

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1680,4 +1680,118 @@ int mca_spml_ucx_team_reduce(shmem_team_t team, void
16801680
return OSHMEM_ERR_NOT_IMPLEMENTED;
16811681
}
16821682

1683+
int mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store,
1684+
ucp_context_h ucp_context,
1685+
int size)
1686+
{
1687+
store->array = calloc(size, sizeof(*store->array));
1688+
if (store->array == NULL) {
1689+
return OSHMEM_ERR_OUT_OF_RESOURCE;
1690+
}
1691+
1692+
store->size = size;
1693+
store->count = 0;
1694+
store->ucp_context = ucp_context;
1695+
return OSHMEM_SUCCESS;
1696+
}
1697+
1698+
void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store)
1699+
{
1700+
int i;
1701+
1702+
for (i = 0; i < store->count; i++) {
1703+
assert(store->array[i].refcnt == 0);
1704+
ucp_rkey_destroy(store->array[i].rkey);
1705+
}
16831706

1707+
free(store->array);
1708+
store->count = 0;
1709+
store->size = 0;
1710+
}
1711+
1712+
static int mca_spml_ucx_rkey_store_find(const mca_spml_ucx_rkey_store_t *store,
1713+
const ucp_rkey_h target,
1714+
int *index)
1715+
{
1716+
ucp_rkey_compare_params_t params = {};
1717+
int i, end, result, m;
1718+
mca_spml_ucx_rkey_t *entry;
1719+
ucs_status_t status;
1720+
1721+
for (i = 0, end = store->count; i < end;) {
1722+
m = (i + end) / 2;
1723+
entry = &store->array[m];
1724+
1725+
status = ucp_rkey_compare(store->ucp_context, entry->rkey, target,
1726+
&params, &result);
1727+
if (status != UCS_OK) {
1728+
return OSHMEM_ERROR;
1729+
} else if (result == 0) {
1730+
*index = m;
1731+
return OSHMEM_SUCCESS;
1732+
} else if (result > 0) {
1733+
end = m;
1734+
} else {
1735+
i = m + 1;
1736+
}
1737+
}
1738+
1739+
*index = i;
1740+
return OSHMEM_ERR_NOT_FOUND;
1741+
}
1742+
1743+
/* Takes ownership of input ucp remote key */
1744+
ucp_rkey_h mca_spml_ucx_rkey_store_get(mca_spml_ucx_rkey_store_t *store,
1745+
ucp_rkey_h target)
1746+
{
1747+
int ret, i;
1748+
1749+
if (store->size == 0) {
1750+
return target;
1751+
}
1752+
1753+
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
1754+
if ((ret == OSHMEM_ERR_NOT_FOUND) && (store->count < store->size)) {
1755+
memmove(&store->array[i + 1], &store->array[i],
1756+
(store->count - i) * sizeof(*store->array));
1757+
store->array[i].rkey = target;
1758+
store->array[i].refcnt = 0;
1759+
store->count++;
1760+
} else if (ret != OSHMEM_SUCCESS) {
1761+
assert((ret == OSHMEM_ERR_NOT_FOUND) || (ret == OSHMEM_ERROR));
1762+
return target; // internal error or max capacity reached
1763+
} else {
1764+
ucp_rkey_destroy(target);
1765+
}
1766+
1767+
store->array[i].refcnt++;
1768+
return store->array[i].rkey;
1769+
}
1770+
1771+
void mca_spml_ucx_rkey_store_put(mca_spml_ucx_rkey_store_t *store,
1772+
ucp_rkey_h target)
1773+
{
1774+
mca_spml_ucx_rkey_t *entry;
1775+
int ret, i;
1776+
1777+
if (store->size == 0) {
1778+
ucp_rkey_destroy(target);
1779+
return;
1780+
}
1781+
1782+
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
1783+
if (ret != OSHMEM_SUCCESS) {
1784+
ucp_rkey_destroy(target);
1785+
return;
1786+
}
1787+
1788+
entry = &store->array[i];
1789+
entry->refcnt--;
1790+
if (entry->refcnt <= 0) {
1791+
ucp_rkey_destroy(entry->rkey);
1792+
1793+
memmove(&store->array[i], &store->array[i + 1],
1794+
(store->count - (i + 1)) * sizeof(*store->array));
1795+
store->count--;
1796+
}
1797+
}

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -101,33 +101,47 @@ typedef struct mca_spml_ucx_ctx_array {
101101
mca_spml_ucx_ctx_t **ctxs;
102102
} mca_spml_ucx_ctx_array_t;
103103

104+
typedef struct mca_spml_ucx_rkey {
105+
ucp_rkey_h rkey;
106+
uint16_t refcnt;
107+
} mca_spml_ucx_rkey_t;
108+
109+
typedef struct mca_spml_ucx_rkey_store {
110+
int size;
111+
int count;
112+
mca_spml_ucx_rkey_t *array;
113+
ucp_context_h ucp_context;
114+
} mca_spml_ucx_rkey_store_t;
115+
104116
struct mca_spml_ucx {
105-
mca_spml_base_module_t super;
106-
ucp_context_h ucp_context;
107-
int num_disconnect;
108-
int heap_reg_nb;
109-
bool enabled;
117+
mca_spml_base_module_t super;
118+
ucp_context_h ucp_context;
119+
int num_disconnect;
120+
int heap_reg_nb;
121+
bool enabled;
110122
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
111-
char ***remote_addrs_tbl;
112-
mca_spml_ucx_ctx_array_t active_array;
113-
mca_spml_ucx_ctx_array_t idle_array;
114-
int priority; /* component priority */
115-
shmem_internal_mutex_t internal_mutex;
116-
pthread_mutex_t ctx_create_mutex;
123+
char ***remote_addrs_tbl;
124+
mca_spml_ucx_ctx_array_t active_array;
125+
mca_spml_ucx_ctx_array_t idle_array;
126+
int priority; /* component priority */
127+
shmem_internal_mutex_t internal_mutex;
128+
pthread_mutex_t ctx_create_mutex;
117129
/* Fields controlling aux context for put_all_nb SPML routine */
118-
bool async_progress;
119-
int async_tick;
120-
opal_event_base_t *async_event_base;
121-
opal_event_t *tick_event;
122-
mca_spml_ucx_ctx_t *aux_ctx;
123-
pthread_spinlock_t async_lock;
124-
int aux_refcnt;
125-
unsigned long nb_progress_thresh_global;
126-
unsigned long nb_put_progress_thresh;
127-
unsigned long nb_get_progress_thresh;
128-
unsigned long nb_ucp_worker_progress;
129-
unsigned int ucp_workers;
130-
unsigned int ucp_worker_cnt;
130+
bool async_progress;
131+
int async_tick;
132+
opal_event_base_t *async_event_base;
133+
opal_event_t *tick_event;
134+
mca_spml_ucx_ctx_t *aux_ctx;
135+
pthread_spinlock_t async_lock;
136+
int aux_refcnt;
137+
unsigned long nb_progress_thresh_global;
138+
unsigned long nb_put_progress_thresh;
139+
unsigned long nb_get_progress_thresh;
140+
unsigned long nb_ucp_worker_progress;
141+
unsigned int ucp_workers;
142+
unsigned int ucp_worker_cnt;
143+
mca_spml_ucx_rkey_store_t rkey_store;
144+
bool symmetric_rkey;
131145
};
132146
typedef struct mca_spml_ucx mca_spml_ucx_t;
133147

@@ -280,6 +294,12 @@ extern int mca_spml_ucx_team_fcollect(shmem_team_t team, void
280294
extern int mca_spml_ucx_team_reduce(shmem_team_t team, void
281295
*dest, const void *source, size_t nreduce, int operation, int datatype);
282296

297+
extern int mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store,
298+
ucp_context_h ucp_context, int size);
299+
extern void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store);
300+
extern ucp_rkey_h mca_spml_ucx_rkey_store_get(mca_spml_ucx_rkey_store_t *store, const ucp_rkey_h target);
301+
extern void mca_spml_ucx_rkey_store_put(mca_spml_ucx_rkey_store_t *store, const ucp_rkey_h target);
302+
283303

284304
static inline int
285305
mca_spml_ucx_peer_mkey_get(ucp_peer_t *ucp_peer, int index, spml_ucx_cached_mkey_t **out_rmkey)

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ static int mca_spml_ucx_component_close(void)
260260

261261
static int spml_ucx_init(void)
262262
{
263-
unsigned int i;
263+
unsigned int i, ret;
264264
ucs_status_t err;
265265
ucp_config_t *ucp_config;
266266
ucp_params_t params;
@@ -357,6 +357,14 @@ static int spml_ucx_init(void)
357357
-1, EV_PERSIST, mca_spml_ucx_async_cb, NULL);
358358
}
359359

360+
if (mca_spml_ucx.symmetric_rkey) {
361+
ret = mca_spml_ucx_rkey_store_create(&mca_spml_ucx.rkey_store,
362+
mca_spml_ucx.ucp_context, 1000);
363+
if (OSHMEM_SUCCESS != ret) {
364+
SPML_UCX_ERROR("failed to initialize symmetric rkey store");
365+
}
366+
}
367+
360368
mca_spml_ucx.aux_ctx = NULL;
361369
mca_spml_ucx.aux_refcnt = 0;
362370

@@ -527,6 +535,8 @@ static int mca_spml_ucx_component_fini(void)
527535
free(mca_spml_ucx.idle_array.ctxs);
528536
free(mca_spml_ucx.aux_ctx);
529537

538+
mca_spml_ucx_rkey_store_destroy(&mca_spml_ucx.rkey_store);
539+
530540
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
531541
pthread_mutex_destroy(&mca_spml_ucx.ctx_create_mutex);
532542

0 commit comments

Comments
 (0)