@@ -124,7 +124,6 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf,
124124 size_t spc = 0 ;
125125 ssize_t ret_code = 0 ;
126126 int i = 0 ; /* index into the decoded iovec of the buffer */
127- int j = 0 ; /* index into the file via iovec */
128127
129128 mca_common_ompio_decode_datatype (fh , datatype , count , buf ,
130129 & max_data , fh -> f_mem_convertor ,
@@ -138,11 +137,10 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf,
138137 cycles , max_data );
139138#endif
140139
141- j = fh -> f_index_in_file_view ;
142140 for (index = 0 ; index < cycles ; index ++ ) {
143141 mca_common_ompio_build_io_array (fh , index , cycles , bytes_per_cycle ,
144142 max_data , iov_count , decoded_iov ,
145- & i , & j , & total_bytes_read , & spc ,
143+ & i , & total_bytes_read , & spc ,
146144 & fh -> f_io_array , & fh -> f_num_of_io_entries );
147145 if (fh -> f_num_of_io_entries == 0 ) {
148146 ret_code = 0 ;
@@ -188,7 +186,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf,
188186 size_t spc = 0 ;
189187 ssize_t ret_code = 0 ;
190188 int i = 0 ; /* index into the decoded iovec of the buffer */
191- int j = 0 ; /* index into the file via iovec */
192189
193190 char * tbuf1 = NULL , * tbuf2 = NULL ;
194191 char * unpackbuf = NULL , * readbuf = NULL ;
@@ -236,7 +233,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf,
236233 ** - unpack buffer i
237234 */
238235
239- j = fh -> f_index_in_file_view ;
240236 if (can_overlap ) {
241237 mca_common_ompio_register_progress ();
242238 }
@@ -255,7 +251,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf,
255251
256252 mca_common_ompio_build_io_array (fh , index , cycles , bytes_per_cycle ,
257253 bytes_this_cycle , iov_count ,
258- decoded_iov , & i , & j , & tbr , & spc ,
254+ decoded_iov , & i , & tbr , & spc ,
259255 & fh -> f_io_array , & fh -> f_num_of_io_entries );
260256 if (fh -> f_num_of_io_entries == 0 ) {
261257 ret_code = 0 ;
@@ -352,53 +348,103 @@ int mca_common_ompio_file_read_at (ompio_file_t *fh,
352348 return ret ;
353349}
354350
351+ static void mca_common_ompio_post_next_read_subreq (struct mca_ompio_request_t * req , int index )
352+ {
353+ uint32_t iov_count = 1 ;
354+ size_t pos = 0 , spc = 0 , tbw = 0 ;
355+ int i = 0 ;
356+ mca_ompio_request_t * ompio_subreq = NULL ;
357+ size_t bytes_per_cycle = OMPIO_MCA_GET (req -> req_fh , pipeline_buffer_size );
358+
359+ struct iovec * decoded_iov = (struct iovec * ) malloc (sizeof (struct iovec ));
360+ if ( NULL == decoded_iov ) {
361+ opal_output (1 , "common_ompio: could not allocate memory.\n" );
362+ return ;
363+ }
364+
365+ /* Step 1: finish index-1 unpack operation */
366+ if (index - 1 >= 0 ) {
367+ size_t num_bytes = bytes_per_cycle ;
368+ /**
369+ * should really be 'req_num_subreqs -1 == index -1'
370+ * which is the same as below.
371+ */
372+ if (req -> req_num_subreqs == index ) {
373+ num_bytes = req -> req_max_data - (index - 1 )* bytes_per_cycle ;
374+ }
375+ decoded_iov -> iov_base = req -> req_tbuf ;
376+ decoded_iov -> iov_len = num_bytes ;
377+ opal_convertor_unpack (& req -> req_convertor , decoded_iov , & iov_count , & pos );
378+ }
379+
380+ /* Step 2: post next iread subrequest */
381+ if (req -> req_num_subreqs == index ) {
382+ /* all done */
383+ free (decoded_iov );
384+ return ;
385+ }
386+
387+ decoded_iov -> iov_base = req -> req_tbuf ;
388+ decoded_iov -> iov_len = (req -> req_num_subreqs - 1 == index ) ?
389+ req -> req_max_data - (index * bytes_per_cycle ) : req -> req_size ;
390+ mca_common_ompio_build_io_array (req -> req_fh , index , req -> req_num_subreqs ,
391+ bytes_per_cycle , decoded_iov -> iov_len ,
392+ iov_count , decoded_iov ,
393+ & i , & tbw , & spc ,
394+ & req -> req_fh -> f_io_array ,
395+ & req -> req_fh -> f_num_of_io_entries );
396+
397+ mca_common_ompio_request_alloc ( & ompio_subreq , MCA_OMPIO_REQUEST_READ );
398+ ompio_subreq -> req_parent = req ;
399+ req -> req_fh -> f_fbtl -> fbtl_ipreadv (req -> req_fh , (ompi_request_t * )ompio_subreq );
400+
401+ free (req -> req_fh -> f_io_array );
402+ req -> req_fh -> f_io_array = NULL ;
403+ req -> req_fh -> f_num_of_io_entries = 0 ;
404+
405+ free (decoded_iov );
406+ }
355407
356408int mca_common_ompio_file_iread (ompio_file_t * fh ,
357- void * buf ,
358- int count ,
359- struct ompi_datatype_t * datatype ,
360- ompi_request_t * * request )
409+ void * buf ,
410+ int count ,
411+ struct ompi_datatype_t * datatype ,
412+ ompi_request_t * * request )
361413{
362414 int ret = OMPI_SUCCESS ;
363415 mca_ompio_request_t * ompio_req = NULL ;
364- size_t spc = 0 ;
416+ struct iovec * decoded_iov = NULL ;
365417
366418 if (fh -> f_amode & MPI_MODE_WRONLY ){
367- // opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
368419 ret = MPI_ERR_ACCESS ;
369- return ret ;
420+ return ret ;
370421 }
371422
372423 mca_common_ompio_request_alloc ( & ompio_req , MCA_OMPIO_REQUEST_READ );
373424
374- if ( 0 == count ) {
425+ if (0 == count || 0 == fh -> f_iov_count ) {
375426 ompio_req -> req_ompi .req_status .MPI_ERROR = OMPI_SUCCESS ;
376427 ompio_req -> req_ompi .req_status ._ucount = 0 ;
377428 ompi_request_complete (& ompio_req -> req_ompi , false);
378429 * request = (ompi_request_t * ) ompio_req ;
379-
430+
380431 return OMPI_SUCCESS ;
381432 }
382433
383434 if ( NULL != fh -> f_fbtl -> fbtl_ipreadv ) {
384435 // This fbtl has support for non-blocking operations
385-
386- size_t total_bytes_read = 0 ; /* total bytes that have been read*/
387436 uint32_t iov_count = 0 ;
388- struct iovec * decoded_iov = NULL ;
389-
390437 size_t max_data = 0 ;
391- int i = 0 ; /* index into the decoded iovec of the buffer */
392- int j = 0 ; /* index into the file via iovec */
393-
394438 bool need_to_copy = false;
395-
396439 int is_gpu , is_managed ;
397- mca_common_ompio_check_gpu_buf ( fh , buf , & is_gpu , & is_managed );
398- if ( is_gpu && !is_managed ) {
440+
441+ mca_common_ompio_check_gpu_buf (fh , buf , & is_gpu , & is_managed );
442+ if (is_gpu && !is_managed ) {
399443 need_to_copy = true;
400444 }
401445
446+ mca_common_ompio_register_progress ();
447+
402448 if ( !( fh -> f_flags & OMPIO_DATAREP_NATIVE ) &&
403449 !(datatype == & ompi_mpi_byte .dt ||
404450 datatype == & ompi_mpi_char .dt )) {
@@ -411,88 +457,61 @@ int mca_common_ompio_file_iread (ompio_file_t *fh,
411457 need_to_copy = true;
412458 }
413459
414- if ( need_to_copy ) {
415- char * tbuf = NULL ;
460+ if (need_to_copy ) {
461+ size_t pipeline_buf_size = OMPIO_MCA_GET ( fh , pipeline_buffer_size ) ;
416462
417- OMPIO_PREPARE_READ_BUF (fh , buf , count , datatype , tbuf , & ompio_req -> req_convertor ,
418- max_data , 0 , decoded_iov , iov_count );
419-
420- ompio_req -> req_tbuf = tbuf ;
421- ompio_req -> req_size = max_data ;
463+ OMPIO_PREPARE_READ_BUF (fh , buf , count , datatype , ompio_req -> req_tbuf ,
464+ & ompio_req -> req_convertor , max_data ,
465+ pipeline_buf_size , decoded_iov , iov_count );
466+
467+ ompio_req -> req_num_subreqs = ceil ((double )max_data /pipeline_buf_size );
468+ ompio_req -> req_size = pipeline_buf_size ;
469+ ompio_req -> req_max_data = max_data ;
470+ ompio_req -> req_post_next_subreq = mca_common_ompio_post_next_read_subreq ;
471+ ompio_req -> req_fh = fh ;
472+ ompio_req -> req_ompi .req_status .MPI_ERROR = MPI_SUCCESS ;
473+
474+ mca_common_ompio_post_next_read_subreq (ompio_req , 0 );
422475 }
423476 else {
424- mca_common_ompio_decode_datatype (fh ,
425- datatype ,
426- count ,
427- buf ,
428- & max_data ,
429- fh -> f_mem_convertor ,
430- & decoded_iov ,
431- & iov_count );
432- }
433-
434- if ( 0 < max_data && 0 == fh -> f_iov_count ) {
435- ompio_req -> req_ompi .req_status .MPI_ERROR = OMPI_SUCCESS ;
436- ompio_req -> req_ompi .req_status ._ucount = 0 ;
437- ompi_request_complete (& ompio_req -> req_ompi , false);
438- * request = (ompi_request_t * ) ompio_req ;
439- if (NULL != decoded_iov ) {
440- free (decoded_iov );
441- decoded_iov = NULL ;
442- }
443-
444- return OMPI_SUCCESS ;
445- }
446-
447- // Non-blocking operations have to occur in a single cycle
448- j = fh -> f_index_in_file_view ;
449-
450- mca_common_ompio_build_io_array ( fh ,
451- 0 , // index
452- 1 , // no. of cyces
453- max_data , // setting bytes per cycle to match data
454- max_data ,
455- iov_count ,
456- decoded_iov ,
457- & i ,
458- & j ,
459- & total_bytes_read ,
460- & spc ,
461- & fh -> f_io_array ,
462- & fh -> f_num_of_io_entries );
463-
464- if (fh -> f_num_of_io_entries ) {
465- fh -> f_fbtl -> fbtl_ipreadv (fh , (ompi_request_t * ) ompio_req );
466- }
467-
468- mca_common_ompio_register_progress ();
469-
470- fh -> f_num_of_io_entries = 0 ;
471- if (NULL != fh -> f_io_array ) {
472- free (fh -> f_io_array );
473- fh -> f_io_array = NULL ;
474- }
477+ int i = 0 ;
478+ size_t spc = 0 , tbr = 0 ;
479+ mca_common_ompio_decode_datatype (fh , datatype , count , buf ,
480+ & max_data , fh -> f_mem_convertor ,
481+ & decoded_iov , & iov_count );
482+
483+ /**
484+ * Non-blocking operations have to occur in a single cycle
485+ * If the f_io_array is too long, the fbtl will chunk it up
486+ * internally.
487+ */
488+ mca_common_ompio_build_io_array (fh , 0 , 1 , max_data , max_data ,
489+ iov_count , decoded_iov , & i ,
490+ & tbr , & spc ,
491+ & fh -> f_io_array , & fh -> f_num_of_io_entries );
475492
476- if (NULL != decoded_iov ) {
477- free (decoded_iov );
478- decoded_iov = NULL ;
479- }
493+ fh -> f_fbtl -> fbtl_ipreadv (fh , (ompi_request_t * ) ompio_req );
494+ }
480495 }
481496 else {
482- // This fbtl does not support non-blocking operations
483- ompi_status_public_t status ;
484- ret = mca_common_ompio_file_read (fh , buf , count , datatype , & status );
497+ // This fbtl does not support non-blocking operations
498+ ompi_status_public_t status ;
499+ ret = mca_common_ompio_file_read (fh , buf , count , datatype , & status );
485500
486- ompio_req -> req_ompi .req_status .MPI_ERROR = ret ;
487- ompio_req -> req_ompi .req_status ._ucount = status ._ucount ;
488- ompi_request_complete (& ompio_req -> req_ompi , false);
501+ ompio_req -> req_ompi .req_status .MPI_ERROR = ret ;
502+ ompio_req -> req_ompi .req_status ._ucount = status ._ucount ;
503+ ompi_request_complete (& ompio_req -> req_ompi , false);
489504 }
490505
506+ fh -> f_num_of_io_entries = 0 ;
507+ free (fh -> f_io_array );
508+ fh -> f_io_array = NULL ;
509+ free (decoded_iov );
510+
491511 * request = (ompi_request_t * ) ompio_req ;
492512 return ret ;
493513}
494514
495-
496515int mca_common_ompio_file_iread_at (ompio_file_t * fh ,
497516 OMPI_MPI_OFFSET_TYPE offset ,
498517 void * buf ,
0 commit comments