From 4c6b561bbf8dcde468a394fb5bc9e03c4ee98277 Mon Sep 17 00:00:00 2001 From: Jessie Yang Date: Wed, 7 Feb 2024 10:35:14 -0800 Subject: [PATCH] coll/tuned: Extend the collective tuning file to be topology-aware TUNED collectives selection should account for communicator topology like HAN. The communicator size and message based algorithm selection logic is no longer sufficient to achieve optimal performance when HAN is used. The best algorithm differs between inter-node and intra-node for the same communicator size and message size based on the tuning results. This commit introduces topology dimension in both TUNED collective tuning file rule and the algorithm selection logic. The topological level can be intra-node, internode, or default(mixed). Specify @inter_node or @intra_node after the message size in the dynamic file rules. This is an optional feature so it will not break the old file format. See the file example in coll_tuned_dynamic_file.h Signed-off-by: Jessie Yang --- .../coll/tuned/coll_tuned_decision_dynamic.c | 28 ++++----- ompi/mca/coll/tuned/coll_tuned_dynamic_file.c | 40 ++++++++++++- ompi/mca/coll/tuned/coll_tuned_dynamic_file.h | 58 +++++++++++++++++++ .../mca/coll/tuned/coll_tuned_dynamic_rules.c | 48 +++++++++++++-- .../mca/coll/tuned/coll_tuned_dynamic_rules.h | 15 ++++- 5 files changed, 167 insertions(+), 22 deletions(-) diff --git a/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c b/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c index 4133a64eefc..68aff742334 100644 --- a/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c +++ b/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c @@ -80,7 +80,7 @@ ompi_coll_tuned_allreduce_intra_dec_dynamic (const void *sbuf, void *rbuf, int c dsize *= count; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLREDUCE], - dsize, &faninout, &segsize, &ignoreme); + dsize, comm, &faninout, &segsize, &ignoreme); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -136,7 +136,7 @@ int ompi_coll_tuned_alltoall_intra_dec_dynamic(const void *sbuf, int scount, dsize *= (ptrdiff_t)comsize * (ptrdiff_t)scount; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLTOALL], - dsize, &faninout, &segsize, &max_requests); + dsize, comm, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -187,7 +187,7 @@ int ompi_coll_tuned_alltoallv_intra_dec_dynamic(const void *sbuf, const int *sco int alg, faninout, segsize, max_requests; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLTOALLV], - 0, &faninout, &segsize, &max_requests); + 0, comm, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -231,7 +231,7 @@ int ompi_coll_tuned_barrier_intra_dec_dynamic(struct ompi_communicator_t *comm, int alg, faninout, segsize, ignoreme; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[BARRIER], - 0, &faninout, &segsize, &ignoreme); + 0, comm, &faninout, &segsize, &ignoreme); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -278,7 +278,7 @@ int ompi_coll_tuned_bcast_intra_dec_dynamic(void *buf, int count, dsize *= count; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[BCAST], - dsize, &faninout, &segsize, &ignoreme); + dsize, comm, &faninout, &segsize, &ignoreme); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -332,7 +332,7 @@ int ompi_coll_tuned_reduce_intra_dec_dynamic( const void *sbuf, void *rbuf, dsize *= count; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[REDUCE], - dsize, &faninout, &segsize, &max_requests); + dsize, comm, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -388,7 +388,7 @@ int ompi_coll_tuned_reduce_scatter_intra_dec_dynamic(const void *sbuf, void *rbu dsize *= count; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[REDUCESCATTER], - dsize, &faninout, + dsize, comm, &faninout, &segsize, &ignoreme); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -442,7 +442,7 @@ int ompi_coll_tuned_reduce_scatter_block_intra_dec_dynamic(const void *sbuf, voi dsize *= rcount * size; alg = ompi_coll_tuned_get_target_method_params(tuned_module->com_rules[REDUCESCATTERBLOCK], - dsize, &faninout, + dsize, comm, &faninout, &segsize, &ignoreme); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -500,7 +500,7 @@ int ompi_coll_tuned_allgather_intra_dec_dynamic(const void *sbuf, int scount, dsize *= (ptrdiff_t)comsize * (ptrdiff_t)scount; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLGATHER], - dsize, &faninout, &segsize, &ignoreme); + dsize, comm, &faninout, &segsize, &ignoreme); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -565,7 +565,7 @@ int ompi_coll_tuned_allgatherv_intra_dec_dynamic(const void *sbuf, int scount, per_rank_size = total_size / comsize; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[ALLGATHERV], - per_rank_size, &faninout, &segsize, &ignoreme); + per_rank_size, comm, &faninout, &segsize, &ignoreme); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -618,7 +618,7 @@ int ompi_coll_tuned_gather_intra_dec_dynamic(const void *sbuf, int scount, dsize *= scount * comsize; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[GATHER], - dsize, &faninout, &segsize, &max_requests); + dsize, comm, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -668,7 +668,7 @@ int ompi_coll_tuned_scatter_intra_dec_dynamic(const void *sbuf, int scount, dsize *= scount * comsize; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[SCATTER], - dsize, &faninout, &segsize, &max_requests); + dsize, comm, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -714,7 +714,7 @@ int ompi_coll_tuned_exscan_intra_dec_dynamic(const void *sbuf, void* rbuf, int c dsize *= comsize; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[EXSCAN], - dsize, &faninout, &segsize, &max_requests); + dsize, comm, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ @@ -758,7 +758,7 @@ int ompi_coll_tuned_scan_intra_dec_dynamic(const void *sbuf, void* rbuf, int cou dsize *= comsize; alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[SCAN], - dsize, &faninout, &segsize, &max_requests); + dsize, comm, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ diff --git a/ompi/mca/coll/tuned/coll_tuned_dynamic_file.c b/ompi/mca/coll/tuned/coll_tuned_dynamic_file.c index e56ece1d0b4..3b7c6618c98 100644 --- a/ompi/mca/coll/tuned/coll_tuned_dynamic_file.c +++ b/ompi/mca/coll/tuned/coll_tuned_dynamic_file.c @@ -42,6 +42,7 @@ static int fileline=0; /* used for verbose error messages */ #define getnext(fptr, pval) ompi_coll_base_file_getnext_long(fptr, &fileline, pval) +#define getnext_string(fptr, pval) ompi_coll_base_file_getnext_string(fptr, &fileline, pval) /* * Reads a rule file called fname @@ -59,7 +60,7 @@ int ompi_coll_tuned_read_rules_config_file (char *fname, ompi_coll_alg_rule_t** { long CI, NCS, CS, ALG, NMS, FANINOUT, X, MS, SS; FILE *fptr = (FILE*) NULL; - int x, ncs, nms; + int x, ncs, nms, topo_lvl; ompi_coll_alg_rule_t *alg_rules = (ompi_coll_alg_rule_t*) NULL; /* complete table of rules */ @@ -176,11 +177,46 @@ int ompi_coll_tuned_read_rules_config_file (char *fname, ompi_coll_alg_rule_t** msg_p = &(com_p->msg_rules[nms]); - if( (getnext (fptr, &MS) < 0) || (MS < 0) ) { + char *msg_topo = NULL; + if( getnext_string(fptr, &msg_topo) < 0 ) { + OPAL_OUTPUT((ompi_coll_tuned_stream,"Could not read message size/name of a topo level for collective ID %ld com rule %d msg rule %d at around line %d\n", + CI, ncs, nms, fileline)); + goto on_file_error; + } + + char *temp_str = strdup(msg_topo); + const char *delimiter = "@"; + char *msg_size_str = strtok(temp_str, delimiter); + if (NULL == msg_size_str) { OPAL_OUTPUT((ompi_coll_tuned_stream,"Could not read message size for collective ID %ld com rule %d msg rule %d at around line %d\n", CI, ncs, nms, fileline)); goto on_file_error; } + + char *endptr; + errno = 0; + MS = strtol(msg_size_str, &endptr, 10); + if (errno != 0 || (endptr == msg_size_str) || ('\0' != *endptr ) || MS < 0) { + OPAL_OUTPUT((ompi_coll_tuned_stream,"Invalid message size for collective ID %ld com rule %d msg rule %d at around line %d\n", CI, ncs, nms, fileline)); + goto on_file_error; + } msg_p->msg_size = (size_t)MS; + + char *topo_lvl_name = strtok(NULL, delimiter); + if (NULL == topo_lvl_name) { + msg_p->topologic_level = DEFAULT; + } else { + topo_lvl = mca_coll_tuned_topo_name_to_id(topo_lvl_name); + if (topo_lvl < 0) { + char *endp; + topo_lvl = (int)strtol(topo_lvl_name, &endp, 10); + if (('\0' != *endp ) || (topo_lvl < DEFAULT) || (topo_lvl >= NB_TOPO_LVL)) { + OPAL_OUTPUT((ompi_coll_tuned_stream,"Found an error at line %d: unknown topo level '%s'\n", fileline, topo_lvl_name)); + goto on_file_error; + } + } + msg_p->topologic_level = (TOPO_LVL_T)topo_lvl; + } + free (temp_str); if( (getnext (fptr, &ALG) < 0) || (ALG < 0) ) { OPAL_OUTPUT((ompi_coll_tuned_stream,"Could not read target algorithm method for collective ID %ld com rule %d msg rule %d at around line %d\n", CI, ncs, nms, fileline)); diff --git a/ompi/mca/coll/tuned/coll_tuned_dynamic_file.h b/ompi/mca/coll/tuned/coll_tuned_dynamic_file.h index 595e436fa49..00814a749a5 100644 --- a/ompi/mca/coll/tuned/coll_tuned_dynamic_file.h +++ b/ompi/mca/coll/tuned/coll_tuned_dynamic_file.h @@ -24,6 +24,64 @@ /* also need the dynamic rule structures */ #include "coll_tuned_dynamic_rules.h" +/* + * @file + * + * ####################### + * # Dynamic file format # + * ####################### + * File defined rules precede MCA parameter defined rule. + * To activate file reader, the MCA parameter use_dynamic_file_rules must + * be set to true. The path to the dynamic file is given by the MCA + * parameter dynamic_rules_filename. If there is any issue reading the file, + * the file is considered as invalid and only MCA parameter defined rules are + * used. If a potential logical issue is identified in the file, a + * warning is printed but the file is not considered as invalid. + * + * + * Here is an example of a dynamic rules file: + * 1 # number of collectives + * 3 # Collective identifier 1 (defined in ompi/mca/coll/base/coll_base_functions.h) + * 2 # number of comm sizes + * 1 # comm size 1 + * 1 # number of message size rules + * 0 1 0 0 # for message size 0, choose algorithm 1, topo 0, 0 segmentation + * 8 # comm size 8 + * 4 # number of message size rules + * 0 1 0 0 # for message size 0, choose algorithm 1, topo 0, 0 segmentation + * 32768 2 0 0 # for message size 32768, choose algorithm 2, topo 0, 0 segmentation + * 262144 1 0 0 # for message size 262144, choose algorithm 1, topo 0, 0 segmentation + * 524288 2 0 0 # for message size 524288, choose algorithm 2, topo 0, 0 segmentation + * + * Optionally, specify topological level in the message size rules, + * which can be singlenode or disjoint. + * 1 # number of collectives + * 3 # Collective identifier 1 (defined in ompi/mca/coll/base/coll_base_functions.h) + * 2 # number of comm sizes + * 1 # comm size 1 + * 2 # number of message size rules + * 0@singlenode 2 0 0 # for message size 0 and single node communication, choose algorithm 2, topo 0, 0 segmentation + * 0@disjoint 1 0 0 # for message size 0 and disjoint communication, choose algorithm 1, topo 0, 0 segmentation + * 8 # comm size 8 + * 6 # number of message size rules + * 0@singlenode 2 0 0 # for message size 0 and single node communication, choose algorithm 2, topo 0, 0 segmentation + * 0@disjoint 1 0 0 # for message size 0 and disjoint communication, choose algorithm 1, topo 0, 0 segmentation + * 32768@singlenode 2 0 0 # for message size 32768 and single node communication, choose algorithm 2, topo 0, 0 segmentation + * 32768@disjoint 1 0 0 # for message size 32768 and disjoint communication, choose algorithm 1, topo 0, 0 segmentation + * 262144 1 0 0 # for message size 262144, choose algorithm 1, topo 0, 0 segmentation + * 524288 2 0 0 # for message size 524288, choose algorithm 2, topo 0, 0 segmentation + * + * + * Note that comm size and message size rules define minimal + * values and each new rule precede every other rules. This property + * implies that this types of rules must be sorted by increasing value. + * If they are not, some rules wont be used. + * + * The counts define a stack. If the count is set to x, the reader will + * attempt to read x rules of the corresponding type. If a set of rules + * has an invalid count, this is an error and it might not be detected by + * the reader. + */ BEGIN_C_DECLS diff --git a/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c b/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c index 2c2b4469635..78442cc1efe 100644 --- a/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c +++ b/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c @@ -38,6 +38,30 @@ #include "ompi/mca/coll/base/coll_base_util.h" +/* + * topo level conversions both ways; str <-> id + * An enum is used for conversions. + */ +static mca_base_var_enum_value_t level_enumerator[] = { + { SINGLE_NODE, "singlenode" }, + { DISJOINT, "disjoint" }, + { 0 } +}; + +/* + * Stringifier for topological level + */ +int mca_coll_tuned_topo_name_to_id(const char *topo_level_name) +{ + for (int i = 0; level_enumerator[i].string != NULL; i++) { + if (0 == strcmp(topo_level_name, level_enumerator[i].string)) { + return i; + } + } + return -1; +} + + ompi_coll_alg_rule_t* ompi_coll_tuned_mk_alg_rules (int n_alg) { int i; @@ -87,6 +111,7 @@ ompi_coll_msg_rule_t* ompi_coll_tuned_mk_msg_rules (int n_msg_rules, int alg_rul msg_rules[i].com_rule_id = com_rule_id; msg_rules[i].msg_rule_id = i; msg_rules[i].msg_size = 0; /* unknown */ + msg_rules[i].topologic_level = DEFAULT; /* unknown & default */ msg_rules[i].result_alg = 0; /* unknown */ msg_rules[i].result_topo_faninout = 0; /* unknown */ msg_rules[i].result_segsize = 0; /* unknown */ @@ -327,8 +352,9 @@ ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* ru /* * This function takes a com_rule ptr (from the communicators coll tuned data structure) - * (Which is chosen for a particular MPI collective) - * and a (total_)msg_size and it returns (0) and a algorithm to use and a recommended topo faninout and segment size + * (Which is chosen for a particular MPI collective), + * a (total_)msg_size, and the communicator(comm) to which the process belongs, + * and it returns (0) and a algorithm to use and a recommended topo faninout and segment size * all based on the user supplied rules * * Just like the above functions it uses a less than or equal msg size @@ -340,8 +366,9 @@ ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* ru * */ -int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, size_t mpi_msgsize, int *result_topo_faninout, - int* result_segsize, int* max_requests) +int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, const size_t mpi_msgsize, + const struct ompi_communicator_t *comm, + int *result_topo_faninout, int* result_segsize, int* max_requests) { ompi_coll_msg_rule_t* msg_p = (ompi_coll_msg_rule_t*) NULL; ompi_coll_msg_rule_t* best_msg_p = (ompi_coll_msg_rule_t*) NULL; @@ -357,12 +384,18 @@ int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rul /* make a copy of the first msg rule */ best_msg_p = msg_p = base_com_rule->msg_rules; i = 0; + bool found_rules = false; while (in_msg_sizes) { /* OPAL_OUTPUT((ompi_coll_tuned_stream,"checking mpi_msgsize %d against com_id %d msg_id %d index %d msg_size %d", */ /* mpi_msgsize, msg_p->com_rule_id, msg_p->msg_rule_id, i, msg_p->msg_size)); */ if (msg_p->msg_size <= mpi_msgsize) { - best_msg_p = msg_p; + if (msg_p->topologic_level == DEFAULT || + (msg_p->topologic_level == SINGLE_NODE && !ompi_group_have_remote_peers(comm->c_local_group)) || + (msg_p->topologic_level == DISJOINT && OMPI_COMM_IS_INTRA(comm) && OMPI_COMM_IS_DISJOINT_SET(comm) && OMPI_COMM_IS_DISJOINT(comm))) { + best_msg_p = msg_p; + found_rules = true; + } /* OPAL_OUTPUT((ompi_coll_tuned_stream(":ok\n")); */ } else { @@ -374,6 +407,11 @@ int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rul i++; } + if (!found_rules) { + /* Fall back to fixed rules if there is no corresponding topological rule in the file */ + return (0); + } + OPAL_OUTPUT((ompi_coll_tuned_stream,"Selected the following msg rule id %d\n", best_msg_p->msg_rule_id)); ompi_coll_tuned_dump_msg_rule (best_msg_p); diff --git a/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.h b/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.h index e96bd04f6c8..22b6f22f67e 100644 --- a/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.h +++ b/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.h @@ -26,6 +26,15 @@ BEGIN_C_DECLS +/* Topologic levels */ +typedef enum TOPO_LVL { + DEFAULT = -1, + SINGLE_NODE, + DISJOINT, + NB_TOPO_LVL +} TOPO_LVL_T; + + typedef struct msg_rule_s { /* paranoid / debug */ int mpi_comsize; /* which MPI comm size is this for */ @@ -37,6 +46,7 @@ typedef struct msg_rule_s { /* RULE */ size_t msg_size; /* message size */ + TOPO_LVL_T topologic_level; /* single node or disjoint */ /* RESULT */ int result_alg; /* result algorithm to use */ @@ -94,10 +104,13 @@ int ompi_coll_tuned_free_all_rules (ompi_coll_alg_rule_t* alg_p, int n_algs); ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* rules, int alg_id, int mpi_comsize); -int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, size_t mpi_msgsize, +int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, const size_t mpi_msgsize, + const struct ompi_communicator_t *comm, int* result_topo_faninout, int* result_segsize, int* max_requests); +/* Miscellaneous function */ +int mca_coll_tuned_topo_name_to_id(const char *topo_level_name); END_C_DECLS #endif /* MCA_COLL_TUNED_DYNAMIC_RULES_H_HAS_BEEN_INCLUDED */