@@ -1660,111 +1660,4 @@ mod tests {
16601660
16611661 Ok ( ( ) )
16621662 }
1663-
1664- #[ tokio:: test]
1665- async fn test_memory_reservation_column_parallel ( ) -> Result < ( ) > {
1666- async fn test_memory_reservation ( global : ParquetOptions ) -> Result < ( ) > {
1667- let field_a = Field :: new ( "a" , DataType :: Utf8 , false ) ;
1668- let field_b = Field :: new ( "b" , DataType :: Utf8 , false ) ;
1669- let schema = Arc :: new ( Schema :: new ( vec ! [ field_a, field_b] ) ) ;
1670- let object_store_url = ObjectStoreUrl :: local_filesystem ( ) ;
1671-
1672- let file_sink_config = FileSinkConfig {
1673- original_url : String :: default ( ) ,
1674- object_store_url : object_store_url. clone ( ) ,
1675- file_group : FileGroup :: new ( vec ! [ PartitionedFile :: new(
1676- "/tmp" . to_string( ) ,
1677- 1 ,
1678- ) ] ) ,
1679- table_paths : vec ! [ ListingTableUrl :: parse( "file:///" ) ?] ,
1680- output_schema : schema. clone ( ) ,
1681- table_partition_cols : vec ! [ ] ,
1682- insert_op : InsertOp :: Overwrite ,
1683- keep_partition_by_columns : false ,
1684- file_extension : "parquet" . into ( ) ,
1685- } ;
1686- let parquet_sink = Arc :: new ( ParquetSink :: new (
1687- file_sink_config,
1688- TableParquetOptions {
1689- key_value_metadata : std:: collections:: HashMap :: from ( [
1690- ( "my-data" . to_string ( ) , Some ( "stuff" . to_string ( ) ) ) ,
1691- ( "my-data-bool-key" . to_string ( ) , None ) ,
1692- ] ) ,
1693- global,
1694- ..Default :: default ( )
1695- } ,
1696- ) ) ;
1697-
1698- // create data
1699- let col_a: ArrayRef = Arc :: new ( StringArray :: from ( vec ! [ "foo" , "bar" ] ) ) ;
1700- let col_b: ArrayRef = Arc :: new ( StringArray :: from ( vec ! [ "baz" , "baz" ] ) ) ;
1701- let batch =
1702- RecordBatch :: try_from_iter ( vec ! [ ( "a" , col_a) , ( "b" , col_b) ] ) . unwrap ( ) ;
1703-
1704- // create task context
1705- let task_context = build_ctx ( object_store_url. as_ref ( ) ) ;
1706- assert_eq ! (
1707- task_context. memory_pool( ) . reserved( ) ,
1708- 0 ,
1709- "no bytes are reserved yet"
1710- ) ;
1711-
1712- let mut write_task = FileSink :: write_all (
1713- parquet_sink. as_ref ( ) ,
1714- Box :: pin ( RecordBatchStreamAdapter :: new (
1715- schema,
1716- bounded_stream ( batch, 1000 ) ,
1717- ) ) ,
1718- & task_context,
1719- ) ;
1720-
1721- // incrementally poll and check for memory reservation
1722- let mut reserved_bytes = 0 ;
1723- while futures:: poll!( & mut write_task) . is_pending ( ) {
1724- reserved_bytes += task_context. memory_pool ( ) . reserved ( ) ;
1725- tokio:: time:: sleep ( Duration :: from_micros ( 1 ) ) . await ;
1726- }
1727- assert ! (
1728- reserved_bytes > 0 ,
1729- "should have bytes reserved during write"
1730- ) ;
1731- assert_eq ! (
1732- task_context. memory_pool( ) . reserved( ) ,
1733- 0 ,
1734- "no leaking byte reservation"
1735- ) ;
1736-
1737- Ok ( ( ) )
1738- }
1739-
1740- let write_opts = ParquetOptions {
1741- allow_single_file_parallelism : false ,
1742- ..Default :: default ( )
1743- } ;
1744- test_memory_reservation ( write_opts)
1745- . await
1746- . expect ( "should track for non-parallel writes" ) ;
1747-
1748- let row_parallel_write_opts = ParquetOptions {
1749- allow_single_file_parallelism : true ,
1750- maximum_parallel_row_group_writers : 10 ,
1751- maximum_buffered_record_batches_per_stream : 1 ,
1752- ..Default :: default ( )
1753- } ;
1754- test_memory_reservation ( row_parallel_write_opts)
1755- . await
1756- . expect ( "should track for row-parallel writes" ) ;
1757-
1758- let col_parallel_write_opts = ParquetOptions {
1759- allow_single_file_parallelism : true ,
1760- maximum_parallel_row_group_writers : 1 ,
1761- maximum_buffered_record_batches_per_stream : 2 ,
1762- ..Default :: default ( )
1763- } ;
1764- test_memory_reservation ( col_parallel_write_opts)
1765- . await
1766- . expect ( "should track for column-parallel writes" ) ;
1767-
1768- Ok ( ( ) )
1769- }
17701663}
0 commit comments