Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
888 changes: 888 additions & 0 deletions examples/rc_pingpong.c

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions include/lpf/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -2334,6 +2334,30 @@ lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs );
extern _LPFLIB_API
lpf_err_t lpf_resize_message_queue( lpf_t ctx, size_t max_msgs );

extern _LPFLIB_API
lpf_err_t lpf_lock_slot(
lpf_t ctx,
lpf_memslot_t src_slot,
size_t src_offset,
lpf_pid_t dst_pid,
lpf_memslot_t dst_slot,
size_t dst_offset,
size_t size,
lpf_msg_attr_t attr
);

extern _LPFLIB_API
lpf_err_t lpf_unlock_slot(
lpf_t ctx,
lpf_memslot_t src_slot,
size_t src_offset,
lpf_pid_t dst_pid,
lpf_memslot_t dst_slot,
size_t dst_offset,
size_t size,
lpf_msg_attr_t attr
);

/**
* This function returns in @rcvd_msgs the received message count on LPF slot @slot
*/
Expand Down
37 changes: 37 additions & 0 deletions src/MPI/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,43 @@ lpf_err_t lpf_deregister(
return LPF_SUCCESS;
}


lpf_err_t lpf_lock_slot( lpf_t ctx,
lpf_memslot_t src_slot,
size_t src_offset,
lpf_pid_t dst_pid,
lpf_memslot_t dst_slot,
size_t dst_offset,
size_t size,
lpf_msg_attr_t attr
)
{
(void) attr; // ignore parameter 'msg' since this implementation only
// implements core functionality
lpf::Interface * i = realContext(ctx);
if (!i->isAborted())
i->lockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size );
return LPF_SUCCESS;
}

lpf_err_t lpf_unlock_slot( lpf_t ctx,
lpf_memslot_t src_slot,
size_t src_offset,
lpf_pid_t dst_pid,
lpf_memslot_t dst_slot,
size_t dst_offset,
size_t size,
lpf_msg_attr_t attr
)
{
(void) attr; // ignore parameter 'msg' since this implementation only
// implements core functionality
lpf::Interface * i = realContext(ctx);
if (!i->isAborted())
i->unlockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size );
return LPF_SUCCESS;
}

lpf_err_t lpf_put( lpf_t ctx,
lpf_memslot_t src_slot,
size_t src_offset,
Expand Down
94 changes: 90 additions & 4 deletions src/MPI/ibverbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ void IBVerbs :: tryIncrement(Op op, Phase phase, SlotID slot) {
}
break;
case Phase::POST:
if (op == Op::RECV)
if (op == Op::RECV) {
rcvdMsgCount[slot]++;
}
if (op == Op::SEND)
sentMsgCount[slot]++;
break;
Expand Down Expand Up @@ -647,6 +648,91 @@ void IBVerbs :: dereg( SlotID id )
}


void IBVerbs :: blockingCompareAndSwap(SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap)
{
const MemorySlot & src = m_memreg.lookup( srcSlot );
const MemorySlot & dst = m_memreg.lookup( dstSlot );
char * localAddr
= static_cast<char *>(src.glob[m_pid].addr) + srcOffset;
const char * remoteAddr
= static_cast<const char *>(dst.glob[dstPid].addr) + dstOffset;

struct ibv_sge sge;
memset(&sge, 0, sizeof(sge));
sge.addr = reinterpret_cast<uintptr_t>( localAddr );
sge.length = std::min<size_t>(size, m_maxMsgSize );
sge.lkey = src.mr->lkey;

struct ibv_wc wcs[POLL_BATCH];
struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = srcSlot;
wr.sg_list = &sge;
wr.next = NULL; // this needs to be set, otherwise EINVAL return error in ibv_post_send
wr.num_sge = 1;
wr.opcode = IBV_WR_ATOMIC_CMP_AND_SWP;
wr.send_flags = IBV_SEND_SIGNALED;
wr.wr.atomic.remote_addr = reinterpret_cast<uintptr_t>(remoteAddr);
wr.wr.atomic.compare_add = compare_add;
wr.wr.atomic.swap = swap;
wr.wr.atomic.rkey = dst.glob[dstPid].rkey;
struct ibv_send_wr *bad_wr;
int error;

blockingCompareAndSwap:
if (int err = ibv_post_send(m_connectedQps[dstPid].get(), &wr, &bad_wr ))
{
LOG(1, "Error while posting RDMA requests: " << std::strerror(err) );
throw Exception("Error while posting RDMA requests");
}

int pollResult = 0;
while (true) {
pollResult = ibv_poll_cq(m_cqLocal.get(), POLL_BATCH, wcs);
if ( pollResult > 0) {
LOG(4, "Received " << pollResult << " acknowledgements in compare-and-swap function");

for (int i = 0; i < pollResult ; ++i) {
if (wcs[i].status != IBV_WC_SUCCESS)
{
LOG( 2, "Got bad completion status from IB message."
" status = 0x" << std::hex << wcs[i].status
<< ", vendor syndrome = 0x" << std::hex
<< wcs[i].vendor_err );
const char * status_descr;
status_descr = ibv_wc_status_str(wcs[i].status);
LOG( 2, "The work completion status string: " << status_descr);
error = 1;
}
else {
LOG(2, "Process " << m_pid << " Send wcs[" << i << "].src_qp = "<< wcs[i].src_qp);
LOG(2, "Process " << m_pid << " Send wcs[" << i << "].slid = "<< wcs[i].slid);
LOG(2, "Process " << m_pid << " Send wcs[" << i << "].wr_id = "<< wcs[i].wr_id);
}
}
break;
}
else if (pollResult < 0)
{
LOG( 1, "Failed to poll IB completion queue" );
throw Exception("Poll CQ failure");
}
}

uint64_t * remoteValueFound = reinterpret_cast<uint64_t *>(localAddr);
// if we fetched the value we expected, then
// we are holding the lock now (that is, we swapped successfully!)
// else, re-post your request for the lock
if (remoteValueFound[0] != compare_add) {
LOG(2, "Process " << m_pid << " couldn't get the lock. remoteValue = " << remoteValueFound[0] << " compare_add = " << compare_add << " go on, iterate\n");
goto blockingCompareAndSwap;
}
else {
LOG(2, "Process " << m_pid << " reads value " << remoteValueFound[0] << " and expected = " << compare_add <<" gets the lock, done\n");
}
// else we hold the lock and swap value
}

void IBVerbs :: put( SlotID srcSlot, size_t srcOffset,
int dstPid, SlotID dstSlot, size_t dstOffset, size_t size)
{
Expand Down Expand Up @@ -741,7 +827,7 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset,
sr->next = &srs[i+1];
sr->send_flags = 0;

sr->wr_id = m_pid;
sr->wr_id = srcSlot;

sr->sg_list = sge;
sr->num_sge = 1;
Expand Down Expand Up @@ -787,7 +873,7 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset,
}
throw Exception("Error while posting RDMA requests");
}
tryIncrement(Op::RECV, Phase::PRE, dstSlot);
tryIncrement(Op::SEND, Phase::PRE, srcSlot);

}

Expand All @@ -809,8 +895,8 @@ void IBVerbs :: wait_completion(int& error) {


error = 0;
struct ibv_wc wcs[POLL_BATCH];
LOG(5, "Polling for messages" );
struct ibv_wc wcs[POLL_BATCH];
int pollResult = ibv_poll_cq(m_cqLocal.get(), POLL_BATCH, wcs);
if ( pollResult > 0) {
LOG(4, "Received " << pollResult << " acknowledgements");
Expand Down
2 changes: 2 additions & 0 deletions src/MPI/ibverbs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class _LPFLIB_LOCAL IBVerbs
SlotID regGlobal( void * addr, size_t size );
void dereg( SlotID id );

void blockingCompareAndSwap(SlotID srSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap);

void put( SlotID srcSlot, size_t srcOffset,
int dstPid, SlotID dstSlot, size_t dstOffset, size_t size);

Expand Down
19 changes: 19 additions & 0 deletions src/MPI/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ catch ( const std::bad_alloc & e)
throw;
}


void Interface :: lockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset,
size_t size )
{
m_mesgQueue.lockSlot( srcSlot, srcOffset,
dstPid, dstSlot, dstOffset,
size );
}

void Interface :: put( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset,
size_t size )
Expand All @@ -100,6 +110,15 @@ void Interface :: put( memslot_t srcSlot, size_t srcOffset,
size );
}

void Interface :: unlockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset,
size_t size )
{
m_mesgQueue.unlockSlot( srcSlot, srcOffset,
dstPid, dstSlot, dstOffset,
size );
}

void Interface :: getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot) {
m_mesgQueue.getRcvdMsgCountPerSlot(msgs, slot);
}
Expand Down
8 changes: 8 additions & 0 deletions src/MPI/interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ class _LPFLIB_LOCAL Interface
return s_root;
}

void lockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset,
size_t size );

void unlockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset,
size_t size );

_LPFLIB_API
static void initRoot(int *argc, char ***argv);

Expand Down
22 changes: 22 additions & 0 deletions src/MPI/mesgqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,28 @@ void MessageQueue :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset,
#endif
}

void MessageQueue :: lockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size )
{
#ifdef LPF_CORE_MPI_USES_ibverbs
m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 0ULL, 1ULL);
#else
std::cerr << "Only IBVerbs::lockSlot available in this backend, abort\n";
std::abort();
#endif
}

void MessageQueue :: unlockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size )
{
#ifdef LPF_CORE_MPI_USES_ibverbs
m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 1ULL, 0ULL);
#else
std::cerr << "Only IBVerbs::unlockSlot available in this backend, abort\n";
std::abort();
#endif
}

void MessageQueue :: put( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size )
{
Expand Down
6 changes: 6 additions & 0 deletions src/MPI/mesgqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class _LPFLIB_LOCAL MessageQueue
void get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset,
memslot_t dstSlot, size_t dstOffset, size_t size );

void lockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size );

void unlockSlot( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size );

void put( memslot_t srcSlot, size_t srcOffset,
pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size );

Expand Down
26 changes: 26 additions & 0 deletions src/imp/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,32 @@ lpf_err_t lpf_counting_sync_per_slot( lpf_t lpf, lpf_sync_attr_t attr, lpf_memsl
return LPF_SUCCESS;
}

lpf_err_t lpf_lock_slot(
lpf_t ctx,
lpf_memslot_t src_slot,
size_t src_offset,
lpf_pid_t dst_pid,
lpf_memslot_t dst_slot,
size_t dst_offset,
size_t size,
lpf_msg_attr_t attr
) {
return LPF_SUCCESS;
}

lpf_err_t lpf_unlock_slot(
lpf_t ctx,
lpf_memslot_t src_slot,
size_t src_offset,
lpf_pid_t dst_pid,
lpf_memslot_t dst_slot,
size_t dst_offset,
size_t size,
lpf_msg_attr_t attr
) {
return LPF_SUCCESS;
}

static double messageGap( lpf_pid_t p, size_t min_msg_size, lpf_sync_attr_t attr)
{
(void) p;
Expand Down
Loading