From ff7458a8b973a1bd89b7b9565f2d197225de4c6f Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Fri, 9 Dec 2022 17:52:48 -0600 Subject: [PATCH] common/ompio: implement pipelined file_iwrite and iread Pipelined iread/iwrite operations require the notion of subrequests, i.e. a user level request can contain multiple internal subrequests that all have to complete before the user level operation is considered finished. This requires adjustments to the internal ompio progress engine and data structures. Note: this is purely just a pipelined algorithm, no overlap between different iterations. Signed-off-by: Edgar Gabriel --- ompi/mca/common/ompio/common_ompio.h | 5 +- ompi/mca/common/ompio/common_ompio_buffer.h | 24 +- .../mca/common/ompio/common_ompio_file_open.c | 6 + .../mca/common/ompio/common_ompio_file_read.c | 237 +++++++++--------- .../common/ompio/common_ompio_file_write.c | 206 +++++++-------- ompi/mca/common/ompio/common_ompio_request.c | 124 +++++++-- ompi/mca/common/ompio/common_ompio_request.h | 10 + ompi/mca/io/ompio/io_ompio_component.c | 16 +- 8 files changed, 352 insertions(+), 276 deletions(-) diff --git a/ompi/mca/common/ompio/common_ompio.h b/ompi/mca/common/ompio/common_ompio.h index 70860a4238e..4be90e27ec3 100644 --- a/ompi/mca/common/ompio/common_ompio.h +++ b/ompi/mca/common/ompio/common_ompio.h @@ -14,6 +14,7 @@ * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2018 DataDirect Networks. All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -101,6 +102,8 @@ #define OMPIO_PERM_NULL -1 #define OMPIO_IOVEC_INITIAL_SIZE 100 +extern opal_mutex_t mca_common_ompio_mutex; + enum ompio_fs_type { NONE = 0, @@ -274,7 +277,7 @@ OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, OMPI_MP OMPI_DECLSPEC int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, size_t bytes_per_cycle, size_t max_data, uint32_t iov_count, - struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw, + struct iovec *decoded_iov, int *ii, size_t *tbw, size_t *spc, mca_common_ompio_io_array_t **io_array, int *num_io_entries ); diff --git a/ompi/mca/common/ompio/common_ompio_buffer.h b/ompi/mca/common/ompio/common_ompio_buffer.h index ba94c6ca80b..1d1d89eb42e 100644 --- a/ompi/mca/common/ompio/common_ompio_buffer.h +++ b/ompi/mca/common/ompio/common_ompio_buffer.h @@ -31,14 +31,10 @@ opal_output(1, "common_ompio: error allocating memory\n"); \ return OMPI_ERR_OUT_OF_RESOURCE; \ } \ - _decoded_iov = (struct iovec *) malloc ( sizeof ( struct iovec )); \ - if ( NULL == _decoded_iov ) { \ - opal_output(1, "common_ompio: could not allocate memory.\n"); \ - return OMPI_ERR_OUT_OF_RESOURCE; \ - } \ - _decoded_iov->iov_base = _tbuf; \ - _decoded_iov->iov_len = _max_data; \ - _iov_count=1;} + if (NULL != _decoded_iov) { \ + ((struct iovec*)_decoded_iov)->iov_base = _tbuf; \ + ((struct iovec*)_decoded_iov)->iov_len = _max_data; \ + _iov_count=1;}} #define OMPIO_PREPARE_READ_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_tmp_buf_size,_decoded_iov,_iov_count){ \ OBJ_CONSTRUCT( _convertor, opal_convertor_t); \ @@ -49,14 +45,10 @@ opal_output(1, "common_ompio: error allocating memory\n"); \ return OMPI_ERR_OUT_OF_RESOURCE; \ } \ - _decoded_iov = (struct iovec *) malloc ( sizeof ( struct iovec )); \ - if ( NULL == _decoded_iov ) { \ - opal_output(1, "common_ompio: could not allocate memory.\n"); \ - return OMPI_ERR_OUT_OF_RESOURCE; \ - } \ - _decoded_iov->iov_base = _tbuf; \ - _decoded_iov->iov_len = _max_data; \ - _iov_count=1;} + if (NULL != _decoded_iov) { \ + ((struct iovec*)_decoded_iov)->iov_base = _tbuf; \ + ((struct iovec*)_decoded_iov)->iov_len = _max_data; \ + _iov_count=1;}} void mca_common_ompio_check_gpu_buf ( ompio_file_t *fh, const void *buf, int *is_gpu, int *is_managed); diff --git a/ompi/mca/common/ompio/common_ompio_file_open.c b/ompi/mca/common/ompio/common_ompio_file_open.c index 528c597d191..924994b34e3 100644 --- a/ompi/mca/common/ompio/common_ompio_file_open.c +++ b/ompi/mca/common/ompio/common_ompio_file_open.c @@ -48,6 +48,12 @@ static mca_common_ompio_generate_current_file_view_fn_t generate_current_file_view_fn; static mca_common_ompio_get_mca_parameter_value_fn_t get_mca_parameter_value_fn; +/* + * Global, component-wide OMPIO mutex + */ +opal_mutex_t mca_common_ompio_mutex = {{0}}; + + int mca_common_ompio_file_open (ompi_communicator_t *comm, const char *filename, int amode, diff --git a/ompi/mca/common/ompio/common_ompio_file_read.c b/ompi/mca/common/ompio/common_ompio_file_read.c index 479e98223e2..83bb38bf062 100644 --- a/ompi/mca/common/ompio/common_ompio_file_read.c +++ b/ompi/mca/common/ompio/common_ompio_file_read.c @@ -124,7 +124,6 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf, size_t spc=0; ssize_t ret_code=0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ mca_common_ompio_decode_datatype (fh, datatype, count, buf, &max_data, fh->f_mem_convertor, @@ -138,11 +137,10 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf, cycles, max_data); #endif - j = fh->f_index_in_file_view; for (index = 0; index < cycles; index++) { mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, max_data, iov_count, decoded_iov, - &i, &j, &total_bytes_read, &spc, + &i, &total_bytes_read, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries == 0) { ret_code = 0; @@ -182,13 +180,12 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, int index = 0; int cycles = 0; uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; size_t max_data=0, real_bytes_read=0; size_t spc=0; ssize_t ret_code=0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ char *tbuf1=NULL, *tbuf2=NULL; char *unpackbuf=NULL, *readbuf=NULL; @@ -198,7 +195,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, bytes_per_cycle = OMPIO_MCA_GET(fh, pipeline_buffer_size); OMPIO_PREPARE_READ_BUF (fh, buf, count, datatype, tbuf1, &convertor, - max_data, bytes_per_cycle, decoded_iov, iov_count); + max_data, bytes_per_cycle, &decoded_iov, iov_count); cycles = ceil((double)max_data/bytes_per_cycle); readbuf = unpackbuf = tbuf1; @@ -206,7 +203,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, tbuf2 = mca_common_ompio_alloc_buf (fh, bytes_per_cycle); if (NULL == tbuf2) { opal_output(1, "common_ompio: error allocating memory\n"); - free (decoded_iov); return OMPI_ERR_OUT_OF_RESOURCE; } unpackbuf = tbuf2; @@ -236,15 +232,14 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, ** - unpack buffer i */ - j = fh->f_index_in_file_view; if (can_overlap) { mca_common_ompio_register_progress (); } for (index = 0; index < cycles+1; index++) { if (index < cycles) { - decoded_iov->iov_base = readbuf; - decoded_iov->iov_len = bytes_per_cycle; + decoded_iov.iov_base = readbuf; + decoded_iov.iov_len = bytes_per_cycle; bytes_this_cycle = (index == cycles-1) ? (max_data - (index * bytes_per_cycle)) : bytes_per_cycle; @@ -255,7 +250,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, bytes_this_cycle, iov_count, - decoded_iov, &i, &j, &tbr, &spc, + &decoded_iov, &i, &tbr, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries == 0) { ret_code = 0; @@ -263,7 +258,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, } if (can_overlap) { - mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ); + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_READ); fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *)ompio_req); } else { ret_code = fh->f_fbtl->fbtl_preadv (fh); @@ -293,9 +288,9 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, if ((can_overlap && index != 0) || (!can_overlap && index < cycles)) { size_t pos = 0; - decoded_iov->iov_base = unpackbuf; - decoded_iov->iov_len = can_overlap ? bytes_prev_cycle : bytes_this_cycle; - opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos); + decoded_iov.iov_base = unpackbuf; + decoded_iov.iov_len = can_overlap ? bytes_prev_cycle : bytes_this_cycle; + opal_convertor_unpack (&convertor, &decoded_iov, &iov_count, &pos); } fh->f_num_of_io_entries = 0; @@ -317,7 +312,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, mca_common_ompio_release_buf (fh, tbuf2); } - free (decoded_iov); if ( MPI_STATUS_IGNORE != status ) { status->_ucount = real_bytes_read; } @@ -352,53 +346,95 @@ int mca_common_ompio_file_read_at (ompio_file_t *fh, return ret; } +static void mca_common_ompio_post_next_read_subreq(struct mca_ompio_request_t *req, int index) +{ + uint32_t iov_count = 1; + size_t pos = 0, spc = 0, tbw = 0; + int i = 0; + mca_ompio_request_t *ompio_subreq=NULL; + size_t bytes_per_cycle = OMPIO_MCA_GET(req->req_fh, pipeline_buffer_size); + struct iovec decoded_iov; + + /* Step 1: finish index-1 unpack operation */ + if (index - 1 >= 0) { + size_t num_bytes = bytes_per_cycle; + /** + * should really be 'req_num_subreqs -1 == index -1' + * which is the same as below. + */ + if (req->req_num_subreqs == index) { + num_bytes = req->req_max_data - (index-1)* bytes_per_cycle; + } + decoded_iov.iov_base = req->req_tbuf; + decoded_iov.iov_len = num_bytes; + opal_convertor_unpack (&req->req_convertor, &decoded_iov, &iov_count, &pos); + } + + /* Step 2: post next iread subrequest */ + if (req->req_num_subreqs == index) { + /* all done */ + return; + } + + decoded_iov.iov_base = req->req_tbuf; + decoded_iov.iov_len = (req->req_num_subreqs-1 == index) ? + req->req_max_data - (index* bytes_per_cycle) : req->req_size; + mca_common_ompio_build_io_array (req->req_fh, index, req->req_num_subreqs, + bytes_per_cycle, decoded_iov.iov_len, + iov_count, &decoded_iov, + &i, &tbw, &spc, + &req->req_fh->f_io_array, + &req->req_fh->f_num_of_io_entries); + + mca_common_ompio_request_alloc ( &ompio_subreq, MCA_OMPIO_REQUEST_READ); + ompio_subreq->req_parent = req; + req->req_fh->f_fbtl->fbtl_ipreadv (req->req_fh, (ompi_request_t *)ompio_subreq); + + free(req->req_fh->f_io_array); + req->req_fh->f_io_array = NULL; + req->req_fh->f_num_of_io_entries = 0; +} int mca_common_ompio_file_iread (ompio_file_t *fh, - void *buf, - int count, - struct ompi_datatype_t *datatype, - ompi_request_t **request) + void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_request_t **request) { int ret = OMPI_SUCCESS; mca_ompio_request_t *ompio_req=NULL; - size_t spc=0; + struct iovec *decoded_iov = NULL; if (fh->f_amode & MPI_MODE_WRONLY){ -// opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n"); ret = MPI_ERR_ACCESS; - return ret; + return ret; } - mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ); + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_READ); - if ( 0 == count ) { + if (0 == count || 0 == fh->f_iov_count) { ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; ompio_req->req_ompi.req_status._ucount = 0; ompi_request_complete (&ompio_req->req_ompi, false); *request = (ompi_request_t *) ompio_req; - + return OMPI_SUCCESS; } - if ( NULL != fh->f_fbtl->fbtl_ipreadv ) { + if (NULL != fh->f_fbtl->fbtl_ipreadv) { // This fbtl has support for non-blocking operations - - size_t total_bytes_read = 0; /* total bytes that have been read*/ uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; - size_t max_data = 0; - int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ - bool need_to_copy = false; - int is_gpu, is_managed; - mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed); - if ( is_gpu && !is_managed ) { + + mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); + if (is_gpu && !is_managed) { need_to_copy = true; } + mca_common_ompio_register_progress (); + if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) && !(datatype == &ompi_mpi_byte.dt || datatype == &ompi_mpi_char.dt )) { @@ -411,88 +447,61 @@ int mca_common_ompio_file_iread (ompio_file_t *fh, need_to_copy = true; } - if ( need_to_copy ) { - char *tbuf=NULL; + if (need_to_copy) { + size_t pipeline_buf_size = OMPIO_MCA_GET(fh, pipeline_buffer_size); - OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, tbuf, &ompio_req->req_convertor, - max_data, 0, decoded_iov, iov_count); - - ompio_req->req_tbuf = tbuf; - ompio_req->req_size = max_data; + OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, ompio_req->req_tbuf, + &ompio_req->req_convertor, max_data, + pipeline_buf_size, NULL, iov_count); + + ompio_req->req_num_subreqs = ceil((double)max_data/pipeline_buf_size); + ompio_req->req_size = pipeline_buf_size; + ompio_req->req_max_data = max_data; + ompio_req->req_post_next_subreq = mca_common_ompio_post_next_read_subreq; + ompio_req->req_fh = fh; + ompio_req->req_ompi.req_status.MPI_ERROR = MPI_SUCCESS; + + mca_common_ompio_post_next_read_subreq (ompio_req, 0); } else { - mca_common_ompio_decode_datatype (fh, - datatype, - count, - buf, - &max_data, - fh->f_mem_convertor, - &decoded_iov, - &iov_count); - } - - if ( 0 < max_data && 0 == fh->f_iov_count ) { - ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; - ompio_req->req_ompi.req_status._ucount = 0; - ompi_request_complete (&ompio_req->req_ompi, false); - *request = (ompi_request_t *) ompio_req; - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - - return OMPI_SUCCESS; - } - - // Non-blocking operations have to occur in a single cycle - j = fh->f_index_in_file_view; - - mca_common_ompio_build_io_array ( fh, - 0, // index - 1, // no. of cyces - max_data, // setting bytes per cycle to match data - max_data, - iov_count, - decoded_iov, - &i, - &j, - &total_bytes_read, - &spc, - &fh->f_io_array, - &fh->f_num_of_io_entries); - - if (fh->f_num_of_io_entries) { - fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req); - } - - mca_common_ompio_register_progress (); - - fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; - } + int i = 0; + size_t spc = 0, tbr = 0; + mca_common_ompio_decode_datatype (fh, datatype, count, buf, + &max_data, fh->f_mem_convertor, + &decoded_iov, &iov_count); + + /** + * Non-blocking operations have to occur in a single cycle + * If the f_io_array is too long, the fbtl will chunk it up + * internally. + */ + mca_common_ompio_build_io_array (fh, 0, 1, max_data, max_data, + iov_count, decoded_iov, &i, + &tbr, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } + fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req); + } } else { - // This fbtl does not support non-blocking operations - ompi_status_public_t status; - ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status); + // This fbtl does not support non-blocking operations + ompi_status_public_t status; + ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status); - ompio_req->req_ompi.req_status.MPI_ERROR = ret; - ompio_req->req_ompi.req_status._ucount = status._ucount; - ompi_request_complete (&ompio_req->req_ompi, false); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = status._ucount; + ompi_request_complete (&ompio_req->req_ompi, false); } + fh->f_num_of_io_entries = 0; + free (fh->f_io_array); + fh->f_io_array = NULL; + free (decoded_iov); + *request = (ompi_request_t *) ompio_req; return ret; } - int mca_common_ompio_file_iread_at (ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset, void *buf, @@ -554,24 +563,20 @@ int mca_common_ompio_file_read_all (ompio_file_t *fh, size_t pos=0, max_data=0; char *tbuf=NULL; opal_convertor_t convertor; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; uint32_t iov_count = 0; OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, tbuf, &convertor, - max_data, 0, decoded_iov, iov_count); + max_data, 0, &decoded_iov, iov_count); ret = fh->f_fcoll->fcoll_file_read_all (fh, - decoded_iov->iov_base, - decoded_iov->iov_len, + decoded_iov.iov_base, + decoded_iov.iov_len, MPI_BYTE, status); - opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos ); + opal_convertor_unpack (&convertor, &decoded_iov, &iov_count, &pos ); opal_convertor_cleanup (&convertor); - mca_common_ompio_release_buf (fh, decoded_iov->iov_base); - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } + mca_common_ompio_release_buf (fh, decoded_iov.iov_base); } else { ret = fh->f_fcoll->fcoll_file_read_all (fh, diff --git a/ompi/mca/common/ompio/common_ompio_file_write.c b/ompi/mca/common/ompio/common_ompio_file_write.c index e2ceb8187c9..32848c09e3e 100644 --- a/ompi/mca/common/ompio/common_ompio_file_write.c +++ b/ompi/mca/common/ompio/common_ompio_file_write.c @@ -107,7 +107,6 @@ int mca_common_ompio_file_write_default (ompio_file_t *fh, ssize_t ret_code = 0; size_t spc = 0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file view iovec */ mca_common_ompio_decode_datatype (fh, datatype, count, buf, &max_data, @@ -117,12 +116,11 @@ int mca_common_ompio_file_write_default (ompio_file_t *fh, bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size); cycles = ceil((double)max_data/bytes_per_cycle); - j = fh->f_index_in_file_view; for (index = 0; index < cycles; index++) { mca_common_ompio_build_io_array ( fh, index, cycles, bytes_per_cycle, max_data, iov_count, decoded_iov, - &i, &j, &total_bytes_written, &spc, + &i, &total_bytes_written, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries == 0) { ret_code = 0; @@ -162,13 +160,12 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, int cycles = 0; uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; size_t bytes_per_cycle=0, tbw = 0; size_t max_data=0, real_bytes_written=0; ssize_t ret_code=0; size_t spc=0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file view iovec */ size_t pos=0; char *tbuf1=NULL, *tbuf2=NULL; @@ -179,7 +176,7 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, bytes_per_cycle = OMPIO_MCA_GET(fh, pipeline_buffer_size); OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf1, &convertor, - max_data, bytes_per_cycle, decoded_iov, iov_count); + max_data, bytes_per_cycle, &decoded_iov, iov_count); cycles = ceil((double)max_data/bytes_per_cycle); packbuf = tbuf1; @@ -188,7 +185,6 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, tbuf2 = mca_common_ompio_alloc_buf (fh, bytes_per_cycle); if (NULL == tbuf2) { opal_output(1, "common_ompio: error allocating memory\n"); - free (decoded_iov); return OMPI_ERR_OUT_OF_RESOURCE; } writebuf = tbuf2; @@ -213,25 +209,24 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, ** - post pwrite for iter i */ - j = fh->f_index_in_file_view; if (can_overlap) { mca_common_ompio_register_progress (); } for (index = 0; index <= cycles; index++) { if (index < cycles) { - decoded_iov->iov_base = packbuf; - decoded_iov->iov_len = bytes_per_cycle; + decoded_iov.iov_base = packbuf; + decoded_iov.iov_len = bytes_per_cycle; iov_count = 1; - opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos); + opal_convertor_pack (&convertor, &decoded_iov, &iov_count, &pos); spc = 0; tbw = 0; i = 0; mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, pos, - iov_count, decoded_iov, - &i, &j, &tbw, &spc, + iov_count, &decoded_iov, + &i, &tbw, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries== 0) { ret_code = 0; @@ -283,7 +278,6 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, } opal_convertor_cleanup (&convertor); - free (decoded_iov); if ( MPI_STATUS_IGNORE != status ) { status->_ucount = real_bytes_written; @@ -316,42 +310,72 @@ int mca_common_ompio_file_write_at (ompio_file_t *fh, return ret; } +static void mca_common_ompio_post_next_write_subreq(struct mca_ompio_request_t *req, int index) +{ + uint32_t iov_count = 1; + size_t bytes_per_cycle = OMPIO_MCA_GET(req->req_fh, pipeline_buffer_size); + size_t pos=0, spc = 0, tbw = 0; + int i = 0; + mca_ompio_request_t *ompio_subreq=NULL; + struct iovec decoded_iov; + + if (req->req_num_subreqs == index) { + /* all done */ + return; + } + + decoded_iov.iov_base = req->req_tbuf; + decoded_iov.iov_len = req->req_size; + opal_convertor_pack (&req->req_convertor, &decoded_iov, &iov_count, &pos); + mca_common_ompio_build_io_array (req->req_fh, index, req->req_num_subreqs, + bytes_per_cycle, pos, + iov_count, &decoded_iov, + &i, &tbw, &spc, + &req->req_fh->f_io_array, + &req->req_fh->f_num_of_io_entries); + + mca_common_ompio_request_alloc (&ompio_subreq, MCA_OMPIO_REQUEST_WRITE); + ompio_subreq->req_parent = req; + req->req_fh->f_fbtl->fbtl_ipwritev (req->req_fh, (ompi_request_t *)ompio_subreq); + + free(req->req_fh->f_io_array); + req->req_fh->f_io_array = NULL; + req->req_fh->f_num_of_io_entries = 0; +} + int mca_common_ompio_file_iwrite (ompio_file_t *fh, - const void *buf, - int count, - struct ompi_datatype_t *datatype, - ompi_request_t **request) + const void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_request_t **request) { int ret = OMPI_SUCCESS; mca_ompio_request_t *ompio_req=NULL; + struct iovec *decoded_iov = NULL; size_t spc=0; if (fh->f_amode & MPI_MODE_RDONLY){ ret = MPI_ERR_READ_ONLY; - return ret; + return ret; } - - mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE); - if ( 0 == count ) { + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_WRITE); + + if (0 == count || 0 == fh->f_iov_count) { ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; ompio_req->req_ompi.req_status._ucount = 0; ompi_request_complete (&ompio_req->req_ompi, false); *request = (ompi_request_t *) ompio_req; - + return OMPI_SUCCESS; } - if ( NULL != fh->f_fbtl->fbtl_ipwritev ) { + if (NULL != fh->f_fbtl->fbtl_ipwritev) { /* This fbtl has support for non-blocking operations */ - uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; size_t max_data = 0; size_t total_bytes_written =0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ - bool need_to_copy = false; int is_gpu, is_managed; @@ -359,10 +383,11 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, if ( is_gpu && !is_managed ) { need_to_copy = true; } + mca_common_ompio_register_progress (); - if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) && + if ( !(fh->f_flags & OMPIO_DATAREP_NATIVE) && !(datatype == &ompi_mpi_byte.dt || - datatype == &ompi_mpi_char.dt )) { + datatype == &ompi_mpi_char.dt) ) { /* only need to copy if any of these conditions are given: 1. buffer is an unmanaged device buffer (checked above). 2. Datarepresentation is anything other than 'native' and @@ -372,77 +397,40 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, need_to_copy = true; } - if ( need_to_copy ) { - size_t pos=0; - char *tbuf=NULL; - opal_convertor_t convertor; - - OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf, &convertor, - max_data, 0, decoded_iov, iov_count); - opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos ); - opal_convertor_cleanup (&convertor); - - ompio_req->req_tbuf = tbuf; - ompio_req->req_size = max_data; + if (need_to_copy) { + size_t pipeline_buf_size = OMPIO_MCA_GET(fh, pipeline_buffer_size); + OMPIO_PREPARE_BUF (fh, buf, count, datatype, ompio_req->req_tbuf, + &ompio_req->req_convertor, max_data, + pipeline_buf_size, NULL, iov_count); + + ompio_req->req_num_subreqs = ceil((double)max_data/pipeline_buf_size); + ompio_req->req_size = pipeline_buf_size; + ompio_req->req_max_data = max_data; + ompio_req->req_post_next_subreq = mca_common_ompio_post_next_write_subreq; + ompio_req->req_fh = fh; + ompio_req->req_ompi.req_status.MPI_ERROR = MPI_SUCCESS; + + mca_common_ompio_post_next_write_subreq (ompio_req, 0); } else { - mca_common_ompio_decode_datatype (fh, - datatype, - count, - buf, - &max_data, + mca_common_ompio_decode_datatype (fh, datatype, count, + buf, &max_data, fh->f_mem_convertor, - &decoded_iov, - &iov_count); - } + &decoded_iov, &iov_count); + + /** + * Non blocking operations have to occur in a single cycle + * If the f_io_array is too long, the fbtl will chunk it up + * internally. + */ + mca_common_ompio_build_io_array ( fh, 0, 1, max_data, max_data, + iov_count, decoded_iov, + &i, &total_bytes_written, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); - if ( 0 < max_data && 0 == fh->f_iov_count ) { - ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; - ompio_req->req_ompi.req_status._ucount = 0; - ompi_request_complete (&ompio_req->req_ompi, false); - *request = (ompi_request_t *) ompio_req; - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - - return OMPI_SUCCESS; - } - - j = fh->f_index_in_file_view; - - /* Non blocking operations have to occur in a single cycle */ - mca_common_ompio_build_io_array ( fh, - 0, // index of current cycle iteration - 1, // number of cycles - max_data, // setting bytes_per_cycle to max_data - max_data, - iov_count, - decoded_iov, - &i, - &j, - &total_bytes_written, - &spc, - &fh->f_io_array, - &fh->f_num_of_io_entries); - - if (fh->f_num_of_io_entries) { fh->f_fbtl->fbtl_ipwritev (fh, (ompi_request_t *) ompio_req); } - - mca_common_ompio_register_progress (); - - fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; - } - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - } - else { + } else { // This fbtl does not support non-blocking write operations ompi_status_public_t status; ret = mca_common_ompio_file_write(fh,buf,count,datatype, &status); @@ -452,6 +440,11 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, ompi_request_complete (&ompio_req->req_ompi, false); } + fh->f_num_of_io_entries = 0; + free (fh->f_io_array); + fh->f_io_array = NULL; + free (decoded_iov); + *request = (ompi_request_t *) ompio_req; return ret; } @@ -516,26 +509,22 @@ int mca_common_ompio_file_write_all (ompio_file_t *fh, size_t pos=0, max_data=0; char *tbuf=NULL; opal_convertor_t convertor; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; uint32_t iov_count = 0; OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf, &convertor, - max_data, 0, decoded_iov, iov_count); - opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos ); - opal_convertor_cleanup ( &convertor); + max_data, 0, &decoded_iov, iov_count); + opal_convertor_pack (&convertor, &decoded_iov, &iov_count, &pos ); + opal_convertor_cleanup (&convertor); ret = fh->f_fcoll->fcoll_file_write_all (fh, - decoded_iov->iov_base, - decoded_iov->iov_len, + decoded_iov.iov_base, + decoded_iov.iov_len, MPI_BYTE, status); - mca_common_ompio_release_buf (fh, decoded_iov->iov_base); - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } + mca_common_ompio_release_buf (fh, decoded_iov.iov_base); } else { ret = fh->f_fcoll->fcoll_file_write_all (fh, @@ -622,7 +611,7 @@ int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, size_t bytes_per_cycle, size_t max_data, uint32_t iov_count, - struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw, + struct iovec *decoded_iov, int *ii, size_t *tbw, size_t *spc, mca_common_ompio_io_array_t **io_array, int *num_io_entries) { @@ -636,7 +625,7 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, size_t sum_previous_length = 0; int k = 0; /* index into the io_array */ int i = *ii; - int j = *jj; + int j = fh->f_index_in_file_view; mca_common_ompio_io_array_t *f_io_array=NULL; int f_num_io_entries=0; @@ -742,7 +731,6 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, } #endif *ii = i; - *jj = j; *tbw = total_bytes_written; *spc = sum_previous_counts; *io_array = f_io_array; diff --git a/ompi/mca/common/ompio/common_ompio_request.c b/ompi/mca/common/ompio/common_ompio_request.c index 40b99043da9..09cafb787ee 100644 --- a/ompi/mca/common/ompio/common_ompio_request.c +++ b/ompi/mca/common/ompio/common_ompio_request.c @@ -11,6 +11,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2008-2019 University of Houston. All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -30,8 +31,6 @@ bool mca_common_ompio_progress_is_registered=false; */ opal_list_t mca_common_ompio_pending_requests = {{0}}; - - static int mca_common_ompio_request_free ( struct ompi_request_t **req) { mca_ompio_request_t *ompio_req = ( mca_ompio_request_t *)*req; @@ -69,19 +68,27 @@ OBJ_CLASS_INSTANCE(mca_ompio_request_t, ompi_request_t, void mca_common_ompio_request_construct(mca_ompio_request_t* req) { OMPI_REQUEST_INIT (&(req->req_ompi), false ); - req->req_ompi.req_free = mca_common_ompio_request_free; - req->req_ompi.req_cancel = mca_common_ompio_request_cancel; - req->req_ompi.req_type = OMPI_REQUEST_IO; - req->req_data = NULL; - req->req_tbuf = NULL; - req->req_size = 0; - req->req_progress_fn = NULL; - req->req_free_fn = NULL; + req->req_ompi.req_free = mca_common_ompio_request_free; + req->req_ompi.req_cancel = mca_common_ompio_request_cancel; + req->req_ompi.req_type = OMPI_REQUEST_IO; + req->req_data = NULL; + req->req_tbuf = NULL; + req->req_size = 0; + req->req_max_data = 0; + req->req_progress_fn = NULL; + req->req_free_fn = NULL; + req->req_parent = NULL; + req->req_post_next_subreq = NULL; + req->req_num_subreqs = 0; + req->req_subreqs_completed = 0; + req->req_fh = NULL; + req->req_post_followup = false; OBJ_CONSTRUCT(&req->req_item, opal_list_item_t); opal_list_append (&mca_common_ompio_pending_requests, &req->req_item); return; } + void mca_common_ompio_request_destruct(mca_ompio_request_t* req) { OMPI_REQUEST_FINI ( &(req->req_ompi)); @@ -107,6 +114,15 @@ void mca_common_ompio_request_fini ( void ) were not destroyed / completed upon MPI_FINALIZE */ OBJ_DESTRUCT(&mca_common_ompio_pending_requests); + if (mca_common_ompio_progress_is_registered) { + OPAL_THREAD_LOCK (&mca_common_ompio_mutex); + if (mca_common_ompio_progress_is_registered) { + opal_progress_unregister(mca_common_ompio_progress); + mca_common_ompio_progress_is_registered=false; + } + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); + } + return; } @@ -125,33 +141,97 @@ void mca_common_ompio_request_alloc ( mca_ompio_request_t **req, mca_ompio_reque void mca_common_ompio_register_progress ( void ) { - if ( false == mca_common_ompio_progress_is_registered) { + if (false == mca_common_ompio_progress_is_registered) { + OPAL_THREAD_LOCK (&mca_common_ompio_mutex); + if (mca_common_ompio_progress_is_registered) { + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); + return; + } opal_progress_register (mca_common_ompio_progress); mca_common_ompio_progress_is_registered=true; + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); } return; } + int mca_common_ompio_progress ( void ) { mca_ompio_request_t *req=NULL; opal_list_item_t *litem=NULL; int completed=0; - OPAL_LIST_FOREACH(litem, &mca_common_ompio_pending_requests, opal_list_item_t) { - req = GET_OMPIO_REQ_FROM_ITEM(litem); - if( REQUEST_COMPLETE(&req->req_ompi) ) { - continue; - } - if ( NULL != req->req_progress_fn ) { - if ( req->req_progress_fn(req) ) { - completed++; - ompi_request_complete (&req->req_ompi, true); - /* The fbtl progress function is expected to set the - * status elements + if (!OPAL_THREAD_TRYLOCK(&mca_common_ompio_mutex)) { + OPAL_LIST_FOREACH(litem, &mca_common_ompio_pending_requests, opal_list_item_t) { + req = GET_OMPIO_REQ_FROM_ITEM(litem); + if (REQUEST_COMPLETE(&req->req_ompi) ) { + continue; + } + if (NULL != req->req_progress_fn) { + if (req->req_progress_fn(req)) { + /** + * To support pipelined read/write operations, a user level request + * can contain multiple internal requests. These sub-requests + * contain a pointer to the parent request. + */ + mca_ompio_request_t *parent = req->req_parent; + if (NULL != parent) { + /* This is a subrequest */ + if (OMPI_SUCCESS != req->req_ompi.req_status.MPI_ERROR) { + parent->req_ompi.req_status.MPI_ERROR = req->req_ompi.req_status.MPI_ERROR; + ompi_request_complete (&parent->req_ompi, true); + continue; + } + parent->req_subreqs_completed++; + parent->req_ompi.req_status._ucount += req->req_ompi.req_status._ucount; + req->req_post_followup = true; + } else { + /* This is a request without subrequests */ + completed++; + ompi_request_complete (&req->req_ompi, true); + } + /* The fbtl progress function is expected to set the + * status elements + */ + } + } else { + /* This is a request without a lower level progress function, .e.g + * a parent request */ + if (req->req_num_subreqs == req->req_subreqs_completed) { + completed++; + ompi_request_complete (&req->req_ompi, true); + } } } + /** + * Splitting the ompio progress loop is necessary to avoid that a pending operation + * consisting of multiple subrequests is executed in a single invokation of the progress + * function. + * + * Otherwise it can happen that the next subrequest is posted, which ends up at the tail + * of the ompio_pending_requests_list, and would be processed in the same loop execution; + * which then posts the next subrequest, which is also processed potentially right away + * etc. This would make the ompio_progress function block for a long time, and prevent + * overlapping operations. + * + * Splitting the loop into two parts, one checking for completion and one posting + * the next subrequest if necessary avoids the problem. + */ + OPAL_LIST_FOREACH(litem, &mca_common_ompio_pending_requests, opal_list_item_t) { + req = GET_OMPIO_REQ_FROM_ITEM(litem); + if (true == req->req_post_followup) { + if (OPAL_THREAD_TRYLOCK(&req->req_fh->f_fh->f_lock)) { + continue; + } + mca_ompio_request_t *parent = req->req_parent; + parent->req_post_next_subreq(parent, parent->req_subreqs_completed); + OPAL_THREAD_UNLOCK(&req->req_fh->f_fh->f_lock); + ompi_request_complete (&req->req_ompi, false); + ompi_request_free ((ompi_request_t**)&req); + } + } + OPAL_THREAD_UNLOCK(&mca_common_ompio_mutex); } return completed; diff --git a/ompi/mca/common/ompio/common_ompio_request.h b/ompi/mca/common/ompio/common_ompio_request.h index 18083862df9..173f8468242 100644 --- a/ompi/mca/common/ompio/common_ompio_request.h +++ b/ompi/mca/common/ompio/common_ompio_request.h @@ -13,6 +13,7 @@ * Copyright (c) 2008-2019 University of Houston. All rights reserved. * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -43,6 +44,8 @@ typedef enum { MCA_OMPIO_REQUEST_READ_ALL, } mca_ompio_request_type_t; +struct mca_ompio_request_t; +typedef void(*mca_ompio_post_next_subreq_t)(struct mca_ompio_request_t *req, int val); /** * Main structure for OMPIO requests @@ -54,9 +57,16 @@ struct mca_ompio_request_t { opal_list_item_t req_item; void *req_tbuf; size_t req_size; + size_t req_max_data; opal_convertor_t req_convertor; mca_fbtl_base_module_progress_fn_t req_progress_fn; mca_fbtl_base_module_request_free_fn_t req_free_fn; + mca_ompio_post_next_subreq_t req_post_next_subreq; + struct mca_ompio_request_t *req_parent; + int req_num_subreqs; + int req_subreqs_completed; + ompio_file_t *req_fh; + bool req_post_followup; }; typedef struct mca_ompio_request_t mca_ompio_request_t; OBJ_CLASS_DECLARATION(mca_ompio_request_t); diff --git a/ompi/mca/io/ompio/io_ompio_component.c b/ompi/mca/io/ompio/io_ompio_component.c index 99b030196a5..838d327e585 100644 --- a/ompi/mca/io/ompio/io_ompio_component.c +++ b/ompi/mca/io/ompio/io_ompio_component.c @@ -88,14 +88,6 @@ static int io_progress(void); static int priority_param = 30; static int delete_priority_param = 30; - -/* - * Global, component-wide OMPIO mutex because OMPIO is not thread safe - */ -opal_mutex_t mca_io_ompio_mutex = {{0}}; - - - /* * Public string showing this component's version number */ @@ -273,7 +265,7 @@ static int register_component(void) static int open_component(void) { /* Create the mutex */ - OBJ_CONSTRUCT(&mca_io_ompio_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&mca_common_ompio_mutex, opal_mutex_t); mca_common_ompio_request_init (); @@ -286,7 +278,7 @@ static int close_component(void) { mca_common_ompio_request_fini (); mca_common_ompio_buffer_alloc_fini(); - OBJ_DESTRUCT(&mca_io_ompio_mutex); + OBJ_DESTRUCT(&mca_common_ompio_mutex); return OMPI_SUCCESS; } @@ -352,9 +344,9 @@ static int delete_select(const char *filename, struct opal_info_t *info, { int ret; - OPAL_THREAD_LOCK (&mca_io_ompio_mutex); + OPAL_THREAD_LOCK (&mca_common_ompio_mutex); ret = mca_common_ompio_file_delete (filename, info); - OPAL_THREAD_UNLOCK (&mca_io_ompio_mutex); + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); return ret; }