Skip to content

Commit ca392e2

Browse files
committed
oshmem: Address review comments
Add configuration detection, store size parameter and reorder added functions. Signed-off-by: Thomas Vegas <[email protected]>
1 parent 63f215a commit ca392e2

File tree

5 files changed

+155
-130
lines changed

5 files changed

+155
-130
lines changed

config/ompi_check_ucx.m4

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ AC_DEFUN([OMPI_CHECK_UCX],[
121121
AC_CHECK_DECLS([UCP_EP_ATTR_FIELD_TRANSPORTS],
122122
[], [],
123123
[#include <ucp/api/ucp.h>])
124+
AC_CHECK_DECLS([UCP_MEM_MAP_SYMMETRIC_RKEY],
125+
[], [],
126+
[#include <ucp/api/ucp.h>])
124127
AC_CHECK_DECLS([ucp_tag_send_nbx,
125128
ucp_tag_send_sync_nbx,
126129
ucp_tag_recv_nbx],

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 134 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,138 @@ static ucp_request_param_t mca_spml_ucx_request_param_b = {
126126
};
127127
#endif
128128

129+
unsigned
130+
mca_spml_ucx_mem_map_flags_symmetric_rkey(struct mca_spml_ucx *spml_ucx)
131+
{
132+
#if HAVE_DECL_UCP_MEM_MAP_SYMMETRIC_RKEY
133+
if (spml_ucx->symmetric_rkey) {
134+
return UCP_MEM_MAP_SYMMETRIC_RKEY;
135+
}
136+
#else
137+
return 0;
138+
#endif
139+
}
140+
141+
int mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store,
142+
ucp_context_h ucp_context,
143+
int size)
144+
{
145+
store->array = calloc(size, sizeof(*store->array));
146+
if (store->array == NULL) {
147+
return OSHMEM_ERR_OUT_OF_RESOURCE;
148+
}
149+
150+
store->size = size;
151+
store->count = 0;
152+
store->ucp_context = ucp_context;
153+
return OSHMEM_SUCCESS;
154+
}
155+
156+
void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store)
157+
{
158+
int i;
159+
160+
for (i = 0; i < store->count; i++) {
161+
assert(store->array[i].refcnt == 0);
162+
ucp_rkey_destroy(store->array[i].rkey);
163+
}
164+
165+
free(store->array);
166+
store->count = 0;
167+
store->size = 0;
168+
}
169+
170+
static int mca_spml_ucx_rkey_store_find(const mca_spml_ucx_rkey_store_t *store,
171+
const ucp_rkey_h target,
172+
int *index)
173+
{
174+
#if HAVE_DECL_UCP_MEM_MAP_SYMMETRIC_RKEY
175+
ucp_rkey_compare_params_t params = {};
176+
int i, end, result, m;
177+
mca_spml_ucx_rkey_t *entry;
178+
ucs_status_t status;
179+
180+
for (i = 0, end = store->count; i < end;) {
181+
m = (i + end) / 2;
182+
entry = &store->array[m];
183+
184+
status = ucp_rkey_compare(store->ucp_context, entry->rkey, target,
185+
&params, &result);
186+
if (status != UCS_OK) {
187+
return OSHMEM_ERROR;
188+
} else if (result == 0) {
189+
*index = m;
190+
return OSHMEM_SUCCESS;
191+
} else if (result > 0) {
192+
end = m;
193+
} else {
194+
i = m + 1;
195+
}
196+
}
197+
198+
*index = i;
199+
return OSHMEM_ERR_NOT_FOUND;
200+
#else
201+
return OSHMEM_ERROR;
202+
#endif
203+
}
204+
205+
/* Takes ownership of input ucp remote key */
206+
static ucp_rkey_h mca_spml_ucx_rkey_store_get(mca_spml_ucx_rkey_store_t *store,
207+
ucp_rkey_h target)
208+
{
209+
int ret, i;
210+
211+
if (store->size == 0) {
212+
return target;
213+
}
214+
215+
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
216+
if ((ret == OSHMEM_ERR_NOT_FOUND) && (store->count < store->size)) {
217+
memmove(&store->array[i + 1], &store->array[i],
218+
(store->count - i) * sizeof(*store->array));
219+
store->array[i].rkey = target;
220+
store->array[i].refcnt = 0;
221+
store->count++;
222+
} else if (ret != OSHMEM_SUCCESS) {
223+
assert((ret == OSHMEM_ERR_NOT_FOUND) || (ret == OSHMEM_ERROR));
224+
return target; // internal error or max capacity reached
225+
} else {
226+
ucp_rkey_destroy(target);
227+
}
228+
229+
store->array[i].refcnt++;
230+
return store->array[i].rkey;
231+
}
232+
233+
static void mca_spml_ucx_rkey_store_put(mca_spml_ucx_rkey_store_t *store,
234+
ucp_rkey_h target)
235+
{
236+
mca_spml_ucx_rkey_t *entry;
237+
int ret, i;
238+
239+
if (store->size == 0) {
240+
ucp_rkey_destroy(target);
241+
return;
242+
}
243+
244+
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
245+
if (ret != OSHMEM_SUCCESS) {
246+
ucp_rkey_destroy(target);
247+
return;
248+
}
249+
250+
entry = &store->array[i];
251+
entry->refcnt--;
252+
if (entry->refcnt <= 0) {
253+
ucp_rkey_destroy(entry->rkey);
254+
255+
memmove(&store->array[i], &store->array[i + 1],
256+
(store->count - (i + 1)) * sizeof(*store->array));
257+
store->count--;
258+
}
259+
}
260+
129261
int mca_spml_ucx_enable(bool enable)
130262
{
131263
SPML_UCX_VERBOSE(50, "*** ucx ENABLED ****");
@@ -737,11 +869,8 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr,
737869
UCP_MEM_MAP_PARAM_FIELD_FLAGS;
738870
mem_map_params.address = addr;
739871
mem_map_params.length = size;
740-
mem_map_params.flags = flags;
741-
742-
if (mca_spml_ucx.symmetric_rkey) {
743-
mem_map_params.flags |= UCP_MEM_MAP_SYMMETRIC_RKEY;
744-
}
872+
mem_map_params.flags = flags |
873+
mca_spml_ucx_mem_map_flags_symmetric_rkey(&mca_spml_ucx);
745874

746875
status = ucp_mem_map(mca_spml_ucx.ucp_context, &mem_map_params, &mem_h);
747876
if (UCS_OK != status) {
@@ -1691,119 +1820,3 @@ int mca_spml_ucx_team_reduce(shmem_team_t team, void
16911820
{
16921821
return OSHMEM_ERR_NOT_IMPLEMENTED;
16931822
}
1694-
1695-
int mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store,
1696-
ucp_context_h ucp_context,
1697-
int size)
1698-
{
1699-
store->array = calloc(size, sizeof(*store->array));
1700-
if (store->array == NULL) {
1701-
return OSHMEM_ERR_OUT_OF_RESOURCE;
1702-
}
1703-
1704-
store->size = size;
1705-
store->count = 0;
1706-
store->ucp_context = ucp_context;
1707-
return OSHMEM_SUCCESS;
1708-
}
1709-
1710-
void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store)
1711-
{
1712-
int i;
1713-
1714-
for (i = 0; i < store->count; i++) {
1715-
assert(store->array[i].refcnt == 0);
1716-
ucp_rkey_destroy(store->array[i].rkey);
1717-
}
1718-
1719-
free(store->array);
1720-
store->count = 0;
1721-
store->size = 0;
1722-
}
1723-
1724-
static int mca_spml_ucx_rkey_store_find(const mca_spml_ucx_rkey_store_t *store,
1725-
const ucp_rkey_h target,
1726-
int *index)
1727-
{
1728-
ucp_rkey_compare_params_t params = {};
1729-
int i, end, result, m;
1730-
mca_spml_ucx_rkey_t *entry;
1731-
ucs_status_t status;
1732-
1733-
for (i = 0, end = store->count; i < end;) {
1734-
m = (i + end) / 2;
1735-
entry = &store->array[m];
1736-
1737-
status = ucp_rkey_compare(store->ucp_context, entry->rkey, target,
1738-
&params, &result);
1739-
if (status != UCS_OK) {
1740-
return OSHMEM_ERROR;
1741-
} else if (result == 0) {
1742-
*index = m;
1743-
return OSHMEM_SUCCESS;
1744-
} else if (result > 0) {
1745-
end = m;
1746-
} else {
1747-
i = m + 1;
1748-
}
1749-
}
1750-
1751-
*index = i;
1752-
return OSHMEM_ERR_NOT_FOUND;
1753-
}
1754-
1755-
/* Takes ownership of input ucp remote key */
1756-
ucp_rkey_h mca_spml_ucx_rkey_store_get(mca_spml_ucx_rkey_store_t *store,
1757-
ucp_rkey_h target)
1758-
{
1759-
int ret, i;
1760-
1761-
if (store->size == 0) {
1762-
return target;
1763-
}
1764-
1765-
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
1766-
if ((ret == OSHMEM_ERR_NOT_FOUND) && (store->count < store->size)) {
1767-
memmove(&store->array[i + 1], &store->array[i],
1768-
(store->count - i) * sizeof(*store->array));
1769-
store->array[i].rkey = target;
1770-
store->array[i].refcnt = 0;
1771-
store->count++;
1772-
} else if (ret != OSHMEM_SUCCESS) {
1773-
assert((ret == OSHMEM_ERR_NOT_FOUND) || (ret == OSHMEM_ERROR));
1774-
return target; // internal error or max capacity reached
1775-
} else {
1776-
ucp_rkey_destroy(target);
1777-
}
1778-
1779-
store->array[i].refcnt++;
1780-
return store->array[i].rkey;
1781-
}
1782-
1783-
void mca_spml_ucx_rkey_store_put(mca_spml_ucx_rkey_store_t *store,
1784-
ucp_rkey_h target)
1785-
{
1786-
mca_spml_ucx_rkey_t *entry;
1787-
int ret, i;
1788-
1789-
if (store->size == 0) {
1790-
ucp_rkey_destroy(target);
1791-
return;
1792-
}
1793-
1794-
ret = mca_spml_ucx_rkey_store_find(store, target, &i);
1795-
if (ret != OSHMEM_SUCCESS) {
1796-
ucp_rkey_destroy(target);
1797-
return;
1798-
}
1799-
1800-
entry = &store->array[i];
1801-
entry->refcnt--;
1802-
if (entry->refcnt <= 0) {
1803-
ucp_rkey_destroy(entry->rkey);
1804-
1805-
memmove(&store->array[i], &store->array[i + 1],
1806-
(store->count - (i + 1)) * sizeof(*store->array));
1807-
store->count--;
1808-
}
1809-
}

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ struct mca_spml_ucx {
142142
unsigned int ucp_worker_cnt;
143143
mca_spml_ucx_rkey_store_t rkey_store;
144144
bool symmetric_rkey;
145+
int symmetric_rkey_size;
145146
};
146147
typedef struct mca_spml_ucx mca_spml_ucx_t;
147148

@@ -294,12 +295,12 @@ extern int mca_spml_ucx_team_fcollect(shmem_team_t team, void
294295
extern int mca_spml_ucx_team_reduce(shmem_team_t team, void
295296
*dest, const void *source, size_t nreduce, int operation, int datatype);
296297

298+
extern unsigned
299+
mca_spml_ucx_mem_map_flags_symmetric_rkey(struct mca_spml_ucx *spml_ucx);
300+
297301
extern int mca_spml_ucx_rkey_store_create(mca_spml_ucx_rkey_store_t *store,
298302
ucp_context_h ucp_context, int size);
299303
extern void mca_spml_ucx_rkey_store_destroy(mca_spml_ucx_rkey_store_t *store);
300-
extern ucp_rkey_h mca_spml_ucx_rkey_store_get(mca_spml_ucx_rkey_store_t *store, const ucp_rkey_h target);
301-
extern void mca_spml_ucx_rkey_store_put(mca_spml_ucx_rkey_store_t *store, const ucp_rkey_h target);
302-
303304

304305
static inline int
305306
mca_spml_ucx_peer_mkey_get(ucp_peer_t *ucp_peer, int index, spml_ucx_cached_mkey_t **out_rmkey)

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ static int mca_spml_ucx_component_register(void)
157157
"Enable remote keys deduplication to save memory at scale (best-effort)",
158158
&mca_spml_ucx.symmetric_rkey);
159159

160+
mca_spml_ucx_param_register_int("symmetric_rkey_size", 1000,
161+
"Size of the symmetric key store. Default (1000)",
162+
&mca_spml_ucx.symmetric_rkey_size);
163+
160164
mca_spml_ucx_param_register_int("async_tick_usec", 3000,
161165
"Asynchronous progress tick granularity (in usec)",
162166
&mca_spml_ucx.async_tick);
@@ -358,8 +362,15 @@ static int spml_ucx_init(void)
358362
}
359363

360364
if (mca_spml_ucx.symmetric_rkey) {
365+
if (mca_spml_ucx.symmetric_rkey_size <= 0) {
366+
SPML_UCX_ERROR("failed to create symmetric rkey store with size %d",
367+
mca_spml_ucx.symmetric_rkey_size);
368+
return OSHMEM_ERROR;
369+
}
370+
361371
ret = mca_spml_ucx_rkey_store_create(&mca_spml_ucx.rkey_store,
362-
mca_spml_ucx.ucp_context, 1000);
372+
mca_spml_ucx.ucp_context,
373+
mca_spml_ucx.symmetric_rkey_size);
363374
if (OSHMEM_SUCCESS != ret) {
364375
SPML_UCX_ERROR("failed to initialize symmetric rkey store");
365376
}

oshmem/mca/sshmem/ucx/sshmem_ucx_module.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,8 @@ segment_create_internal(map_segment_t *ds_buf, void *address, size_t size,
124124

125125
mem_map_params.address = address;
126126
mem_map_params.length = size;
127-
mem_map_params.flags = flags;
128-
129-
if (spml->symmetric_rkey) {
130-
mem_map_params.flags |= UCP_MEM_MAP_SYMMETRIC_RKEY;
131-
}
127+
mem_map_params.flags = flags |
128+
mca_spml_ucx_mem_map_flags_symmetric_rkey(spml);
132129

133130
status = ucp_mem_map(spml->ucp_context, &mem_map_params, &mem_h);
134131
if (UCS_OK != status) {

0 commit comments

Comments
 (0)