Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion include/lpf/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -2384,7 +2384,17 @@ lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t *sent_msgs, lpf_mem
* libraries.
*/
extern _LPFLIB_API
lpf_err_t lpf_flush( lpf_t ctx);
lpf_err_t lpf_flush_sent( lpf_t ctx);

/**
* This function blocks until all the incoming received messages
* waiting on the receive completion queue are handled (via ibv_poll_cq).
* No concept of slots is used here.
* This allows to reuse the send buffers e.g. in higher-level channel
* libraries.
*/
extern _LPFLIB_API
lpf_err_t lpf_flush_received( lpf_t ctx);

#ifdef __cplusplus
}
Expand Down
6 changes: 4 additions & 2 deletions include/lpf/static_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
#undef lpf_get_rcvd_msg_count_per_slot
#undef lpf_get_sent_msg_count_per_slot
#undef lpf_register_global
#undef lpf_flush
#undef lpf_flush_sent
#undef lpf_flush_received
#undef lpf_deregister
#undef lpf_probe
#undef lpf_resize_memory_register
Expand Down Expand Up @@ -95,7 +96,8 @@
#define lpf_get_rcvd_msg_count LPF_FUNC(get_rcvd_msg_count)
#define lpf_get_rcvd_msg_count_per_slot LPF_FUNC(get_rcvd_msg_count_per_slot)
#define lpf_get_sent_msg_count_per_slot LPF_FUNC(get_sent_msg_count_per_slot)
#define lpf_flush LPF_FUNC(flush)
#define lpf_flush_sent LPF_FUNC(flush_sent)
#define lpf_flush_received LPF_FUNC(flush_received)
#define lpf_register_global LPF_FUNC(register_global)
#define lpf_deregister LPF_FUNC(deregister)
#define lpf_probe LPF_FUNC(probe)
Expand Down
13 changes: 11 additions & 2 deletions src/MPI/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,20 @@ lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t * sent_msgs, lpf_me
return LPF_SUCCESS;
}

lpf_err_t lpf_flush( lpf_t ctx)
lpf_err_t lpf_flush_sent( lpf_t ctx)
{
lpf::Interface * i = realContext(ctx);
if (!i->isAborted()) {
i->flush();
i->flushSent();
}
return LPF_SUCCESS;
}

lpf_err_t lpf_flush_received( lpf_t ctx)
{
lpf::Interface * i = realContext(ctx);
if (!i->isAborted()) {
i->flushReceived();
}
return LPF_SUCCESS;
}
Expand Down
137 changes: 76 additions & 61 deletions src/MPI/ibverbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <unistd.h>
#include <algorithm>

#define POLL_BATCH 8
#define POLL_BATCH 64
#define MAX_POLLING 128


Expand Down Expand Up @@ -82,7 +82,7 @@ IBVerbs :: IBVerbs( Communication & comm )
, m_postCount(0)
, m_recvCount(0)
, m_numMsgs(0)
, m_sendTotalInitMsgCount(0)
//, m_sendTotalInitMsgCount(0)
, m_recvTotalInitMsgCount(0)
, m_sentMsgs(0)
, m_recvdMsgs(0)
Expand Down Expand Up @@ -248,33 +248,43 @@ IBVerbs :: ~IBVerbs()
{ }


void IBVerbs :: tryIncrement(Op op, Phase phase, SlotID slot) {
inline void IBVerbs :: tryIncrement(Op op, Phase phase, SlotID slot) {

switch (phase) {
case Phase::INIT:
rcvdMsgCount[slot] = 0;
m_recvInitMsgCount[slot] = 0;
sentMsgCount[slot] = 0;
m_sendInitMsgCount[slot] = 0;
m_getInitMsgCount[slot] = 0;
getMsgCount[slot] = 0;
break;
case Phase::PRE:
m_numMsgs++;
if (op == Op::SEND) {
m_sendTotalInitMsgCount++;
m_numMsgs++;
//m_sendTotalInitMsgCount++;
m_sendInitMsgCount[slot]++;
}
if (op == Op::RECV) {
m_recvTotalInitMsgCount++;
m_recvInitMsgCount[slot]++;
}
if (op == Op::GET) {
m_getInitMsgCount[slot]++;
}
break;
case Phase::POST:
if (op == Op::RECV) {
m_recvdMsgs ++;
rcvdMsgCount[slot]++;
}
if (op == Op::SEND) {
m_sentMsgs++;
sentMsgCount[slot]++;
}
if (op == Op::GET) {
getMsgCount[slot]++;
}
break;
}
}
Expand Down Expand Up @@ -327,7 +337,7 @@ void IBVerbs :: doRemoteProgress() {
do {
pollResult = ibv_poll_cq(m_cqRemote.get(), POLL_BATCH, wcs);
if (pollResult > 0) {
LOG(3, "Process " << m_pid << " signals: I received a message in doRemoteProgress");
LOG(3, "Process " << m_pid << " signals: I received " << pollResult << " remote messages in doRemoteProgress");
}
else if (pollResult < 0)
{
Expand Down Expand Up @@ -358,7 +368,6 @@ void IBVerbs :: doRemoteProgress() {
SlotID slot = wcs[i].imm_data;
// Ignore compare-and-swap atomics!
if (wcs[i].opcode != IBV_WC_COMP_SWAP) {
m_recvdMsgs ++;
tryIncrement(Op::RECV, Phase::POST, slot);
LOG(3, "Rank " << m_pid << " increments received message count to " << rcvdMsgCount[slot] << " for LPF slot " << slot);
}
Expand Down Expand Up @@ -478,8 +487,8 @@ void IBVerbs :: reconnectQPs()
std::memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = 0x12;
attr.retry_cnt = 7;
attr.rnr_retry = 7;
attr.retry_cnt = 0;//7;
attr.rnr_retry = 0;//7;
attr.sq_psn = 0;
attr.max_rd_atomic = 1;
flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
Expand Down Expand Up @@ -712,11 +721,11 @@ void IBVerbs :: blockingCompareAndSwap(SlotID srcSlot, size_t srcOffset, int dst
* else, re-post your request for the lock
*/
if (remoteValueFound[0] != compare_add) {
LOG(2, "Process " << m_pid << " couldn't get the lock. remoteValue = " << remoteValueFound[0] << " compare_add = " << compare_add << " go on, iterate\n");
LOG(4, "Process " << m_pid << " couldn't get the lock. remoteValue = " << remoteValueFound[0] << " compare_add = " << compare_add << " go on, iterate\n");
goto blockingCompareAndSwap;
}
else {
LOG(2, "Process " << m_pid << " reads value " << remoteValueFound[0] << " and expected = " << compare_add <<" gets the lock, done\n");
LOG(4, "Process " << m_pid << " reads value " << remoteValueFound[0] << " and expected = " << compare_add <<" gets the lock, done\n");
}
// else we hold the lock and swap value into the remote slot ...
}
Expand Down Expand Up @@ -812,24 +821,29 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset,
sge->length = std::min<size_t>(size, m_maxMsgSize );
sge->lkey = dst.mr->lkey;

sr->next = &srs[i+1];
sr->send_flags = 0;
sr->next = NULL; // &srs[i+1];
sr->send_flags = IBV_SEND_SIGNALED; //0;

sr->sg_list = sge;
sr->num_sge = 1;
sr->opcode = IBV_WR_RDMA_READ;
sr->wr.rdma.remote_addr = reinterpret_cast<uintptr_t>( remoteAddr );
sr->wr.rdma.rkey = src.glob[srcPid].rkey;
// This logic is reversed compared to ::put
// (not srcSlot, as this slot is remote)
sr->wr_id = dstSlot;
sr->imm_data = dstSlot;

size -= sge->length;
srcOffset += sge->length;
dstOffset += sge->length;
}

// add extra "message" to do the local and remote completion
sge = &sges[numMsgs]; std::memset(sge, 0, sizeof(ibv_sge));
sr = &srs[numMsgs]; std::memset(sr, 0, sizeof(ibv_send_wr));
//sge = &sges[numMsgs]; std::memset(sge, 0, sizeof(ibv_sge));
//sr = &srs[numMsgs]; std::memset(sr, 0, sizeof(ibv_send_wr));

/*
const char * localAddr = static_cast<const char *>(dst.glob[m_pid].addr);
const char * remoteAddr = static_cast<const char *>(src.glob[srcPid].addr);

Expand All @@ -844,12 +858,14 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset,
sr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
sr->sg_list = sge;
sr->num_sge = 0;
// Should srcSlot and dstSlot be reversed for get?
sr->wr_id = srcSlot;
sr->imm_data = dstSlot;
sr->wr.rdma.remote_addr = reinterpret_cast<uintptr_t>( remoteAddr );
sr->wr.rdma.rkey = src.glob[srcPid].rkey;

//Send
*/
struct ibv_send_wr *bad_wr = NULL;
if (int err = ibv_post_send(m_connectedQps[srcPid].get(), &srs[0], &bad_wr ))
{
Expand All @@ -860,7 +876,7 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset,
}
throw Exception("Error while posting RDMA requests");
}
tryIncrement(Op::SEND, Phase::PRE, dstSlot);
tryIncrement(Op::GET, Phase::PRE, dstSlot);

}

Expand All @@ -886,7 +902,7 @@ std::vector<ibv_wc_opcode> IBVerbs :: wait_completion(int& error) {
int pollResult = ibv_poll_cq(m_cqLocal.get(), POLL_BATCH, wcs);
std::vector<ibv_wc_opcode> opcodes;
if ( pollResult > 0) {
LOG(4, "Received " << pollResult << " acknowledgements");
LOG(3, "Process " << m_pid << ": Received " << pollResult << " acknowledgements");

for (int i = 0; i < pollResult ; ++i) {
if (wcs[i].status != IBV_WC_SUCCESS)
Expand All @@ -901,52 +917,66 @@ std::vector<ibv_wc_opcode> IBVerbs :: wait_completion(int& error) {
error = 1;
}
else {
LOG(2, "Process " << m_pid << " Send wcs[" << i << "].src_qp = "<< wcs[i].src_qp);
LOG(2, "Process " << m_pid << " Send wcs[" << i << "].slid = "<< wcs[i].slid);
LOG(2, "Process " << m_pid << " Send wcs[" << i << "].wr_id = "<< wcs[i].wr_id);
LOG(2, "Process " << m_pid << " Send wcs[" << i << "].imm_data = "<< wcs[i].imm_data);
LOG(3, "Process " << m_pid << " Send wcs[" << i << "].src_qp = "<< wcs[i].src_qp);
LOG(3, "Process " << m_pid << " Send wcs[" << i << "].slid = "<< wcs[i].slid);
LOG(3, "Process " << m_pid << " Send wcs[" << i << "].wr_id = "<< wcs[i].wr_id);
LOG(3, "Process " << m_pid << " Send wcs[" << i << "].imm_data = "<< wcs[i].imm_data);
}

SlotID slot = wcs[i].wr_id;
opcodes.push_back(wcs[i].opcode);
// Ignore compare-and-swap atomics!
if (wcs[i].opcode != IBV_WC_COMP_SWAP) {
m_sentMsgs ++;
tryIncrement(Op::SEND, Phase::POST, slot);
if (wcs[i].opcode == IBV_WC_RDMA_READ)
tryIncrement(Op::GET, Phase::POST, slot);
if (wcs[i].opcode == IBV_WC_RDMA_WRITE)
tryIncrement(Op::SEND, Phase::POST, slot);

LOG(3, "Rank " << m_pid << " increments sent message count to " << sentMsgCount[slot] << " for LPF slot " << slot);
}
}
}
else if (pollResult < 0)
{
LOG( 1, "Failed to poll IB completion queue" );
LOG( 5, "Failed to poll IB completion queue" );
throw Exception("Poll CQ failure");
}
return opcodes;
}

void IBVerbs :: flush()
void IBVerbs :: flushReceived() {
doRemoteProgress();
}

void IBVerbs :: flushSent()
{
int error = 0;

while (m_numMsgs > m_sentMsgs) {
LOG(1, "Rank " << m_pid << " m_numMsgs = " << m_numMsgs << " m_sentMsgs = " << m_sentMsgs);

wait_completion(error);
if (error) {
LOG(1, "Error in wait_completion");
std::abort();
bool sendsComplete;
do {
sendsComplete = true;
for (auto it = m_sendInitMsgCount.begin(); it != m_sendInitMsgCount.end(); it++) {
if (it->second > sentMsgCount[it->first]) {
sendsComplete = false;
wait_completion(error);
if (error) {
LOG(1, "Error in wait_completion. Most likely issue is that receiver is not calling ibv_post_srq!\n");
std::abort();
}
}
}
for (auto it = m_getInitMsgCount.begin(); it != m_getInitMsgCount.end(); it++) {
if (it->second > getMsgCount[it->first]) {
sendsComplete = false;
wait_completion(error);
if (error) {
LOG(1, "Error in wait_completion. Most likely issue is that receiver is not calling ibv_post_srq!\n");
std::abort();
}
}
}
} while (!sendsComplete);

}
if (m_numMsgs < m_sentMsgs) {

LOG(1, "Weird, m_numMsgs = " << m_numMsgs << " and m_sentMsgs = " << m_sentMsgs);
std::abort();
}

m_numMsgs = 0;
m_sentMsgs = 0;

}

Expand Down Expand Up @@ -1009,29 +1039,14 @@ void IBVerbs :: sync(bool resized)

int error = 0;

while (m_sendTotalInitMsgCount > m_sentMsgs) {
LOG(1, "Rank " << m_pid << " m_sendTotalInitMsgCount = " << m_sendTotalInitMsgCount << " m_sentMsgs = " << m_sentMsgs);
// flush send queues
flushSent();
// flush receive queues
flushReceived();

wait_completion(error);
if (error) {
LOG(1, "Error in wait_completion");
std::abort();
}

}
if (m_sendTotalInitMsgCount < m_sentMsgs) {

LOG(1, "Weird, m_sendTotalInitMsgCount = " << m_sendTotalInitMsgCount << " and m_sentMsgs = " << m_sentMsgs);
std::abort();
}

m_numMsgs = 0;
m_sendTotalInitMsgCount = 0;
m_sentMsgs = 0;
LOG(1, "Process " << m_pid << " will call barrier\n");
m_comm.barrier();
// at least once in a while the received queues have to be polled for!
doRemoteProgress();


}

Expand Down
Loading