Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ opal_atomic_int64_t ompi_comm_next_base_cid = 1;

struct ompi_comm_cid_context_t;

typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
struct ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);
typedef int (*ompi_comm_iallreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
struct ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);


struct ompi_comm_cid_context_t {
Expand All @@ -78,7 +78,7 @@ struct ompi_comm_cid_context_t {
ompi_communicator_t *comm;
ompi_communicator_t *bridgecomm;

ompi_comm_allreduce_impl_fn_t allreduce_fn;
ompi_comm_iallreduce_impl_fn_t iallreduce_fn;

int nextcid;
int nextlocal_cid;
Expand Down Expand Up @@ -225,38 +225,38 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t
* for the current mode. */
switch (mode) {
case OMPI_COMM_CID_INTRA:
context->allreduce_fn = ompi_comm_allreduce_intra_nb;
context->iallreduce_fn = ompi_comm_allreduce_intra_nb;
break;
case OMPI_COMM_CID_INTER:
context->allreduce_fn = ompi_comm_allreduce_inter_nb;
context->iallreduce_fn = ompi_comm_allreduce_inter_nb;
break;
case OMPI_COMM_CID_GROUP:
case OMPI_COMM_CID_GROUP_NEW:
context->allreduce_fn = ompi_comm_allreduce_group_nb;
context->iallreduce_fn = ompi_comm_allreduce_group_nb;
context->pml_tag = ((int *) arg0)[0];
break;
case OMPI_COMM_CID_INTRA_PMIX:
context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb;
context->iallreduce_fn = ompi_comm_allreduce_intra_pmix_nb;
context->local_leader = ((int *) arg0)[0];
if (arg1) {
context->port_string = strdup ((char *) arg1);
}
context->pmix_tag = strdup ((char *) pmix_tag);
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb;
context->iallreduce_fn = ompi_comm_allreduce_intra_bridge_nb;
context->local_leader = ((int *) arg0)[0];
context->remote_leader = ((int *) arg1)[0];
break;
#if OPAL_ENABLE_FT_MPI
case OMPI_COMM_CID_INTRA_FT:
context->allreduce_fn = ompi_comm_ft_allreduce_intra_nb;
context->iallreduce_fn = ompi_comm_ft_allreduce_intra_nb;
break;
case OMPI_COMM_CID_INTER_FT:
context->allreduce_fn = ompi_comm_ft_allreduce_inter_nb;
context->iallreduce_fn = ompi_comm_ft_allreduce_inter_nb;
break;
case OMPI_COMM_CID_INTRA_PMIX_FT:
context->allreduce_fn = ompi_comm_ft_allreduce_intra_pmix_nb;
context->iallreduce_fn = ompi_comm_ft_allreduce_intra_pmix_nb;
break;
#endif /* OPAL_ENABLE_FT_MPI */
default:
Expand Down Expand Up @@ -600,8 +600,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
#endif /* OPAL_ENABLE_FT_MPI */
}

ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
context, &subreq);
ret = context->iallreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
context, &subreq);
/* there was a failure during non-blocking collective
* all we can do is abort
*/
Expand Down Expand Up @@ -666,7 +666,7 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)

++context->iter;

ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq);
ret = context->iallreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq);
if (OMPI_SUCCESS == ret) {
ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
} else {
Expand Down Expand Up @@ -774,6 +774,11 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);
/* Callback function to set communicator disjointness flags */
static inline void ompi_comm_set_disjointness_nb_complete(ompi_comm_cid_context_t *context)
{
/* Only set the disjoint flags when it is intra-communicator */
if (OMPI_COMM_IS_INTER(*context->newcommp)) {
return;
}

if (OMPI_COMM_IS_DISJOINT_SET(*context->newcommp)) {
opal_show_help("help-comm.txt", "disjointness-set-again", true);
return;
Expand Down Expand Up @@ -870,7 +875,7 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c
ompi_comm_cid_context_t *context;
ompi_comm_request_t *request;
ompi_request_t *subreq;
int ret = 0, local_peers = -1;
int ret = 0;

/* the caller should not pass NULL for comm (it may be the same as *newcomm) */
assert (NULL != comm);
Expand Down Expand Up @@ -902,20 +907,19 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c
OMPI_COMM_SET_PML_ADDED(*newcomm);
}

/**
* Dual-purpose barrier:
* 1. The communicator's disjointness is inferred from max_local_peers.
* 2. After the operation it is allowed to send messages over the new communicator.
*/
local_peers = context->max_local_peers;
ret = context->allreduce_fn (&local_peers, &context->max_local_peers, 1, MPI_MAX, context,
&subreq);
if (OMPI_SUCCESS != ret) {
ompi_comm_request_return (request);
return ret;
if (OMPI_COMM_IS_INTRA(*newcomm)) {
/* The communicator's disjointness is inferred from max_local_peers. */
ret = context->iallreduce_fn (MPI_IN_PLACE, &context->max_local_peers, 1, MPI_MAX, context,
&subreq);
if (OMPI_SUCCESS != ret) {
ompi_comm_request_return (request);
return ret;
}
ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1);
} else {
ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, NULL, 0);
}

ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1);

ompi_comm_request_start (request);

*req = &request->super;
Expand Down