diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 38f6a2c76df6..fc5f8945c439 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -19,7 +19,8 @@ use arrow::array::{ builder::{Int64Builder, StringBuilder}, - Float32Array, Float64Array, RecordBatch, StringArray, UInt64Array, + ArrayRef, Float32Array, Float64Array, RecordBatch, StringArray, StringViewBuilder, + UInt64Array, }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::MemTable; @@ -158,6 +159,31 @@ pub fn create_record_batches( .collect::>() } +/// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder +/// so that both can be used interchangeably. +enum TraceIdBuilder { + Utf8(StringBuilder), + Utf8View(StringViewBuilder), +} + +impl TraceIdBuilder { + /// Append a value to the builder. + fn append_value(&mut self, value: &str) { + match self { + TraceIdBuilder::Utf8(builder) => builder.append_value(value), + TraceIdBuilder::Utf8View(builder) => builder.append_value(value), + } + } + + /// Finish building and return the ArrayRef. + fn finish(self) -> ArrayRef { + match self { + TraceIdBuilder::Utf8(mut builder) => Arc::new(builder.finish()), + TraceIdBuilder::Utf8View(mut builder) => Arc::new(builder.finish()), + } + } +} + /// Create time series data with `partition_cnt` partitions and `sample_cnt` rows per partition /// in ascending order, if `asc` is true, otherwise randomly sampled using a Pareto distribution #[allow(dead_code)] @@ -165,6 +191,7 @@ pub(crate) fn make_data( partition_cnt: i32, sample_cnt: i32, asc: bool, + use_view: bool, ) -> Result<(Arc, Vec>), DataFusionError> { // constants observed from trace data let simultaneous_group_cnt = 2000; @@ -177,11 +204,17 @@ pub(crate) fn make_data( let mut rng = rand::rngs::SmallRng::from_seed([0; 32]); // populate data - let schema = test_schema(); + let schema = test_schema(use_view); let mut partitions = vec![]; let mut cur_time = 16909000000000i64; for _ in 0..partition_cnt { - let mut id_builder = StringBuilder::new(); + // Choose the appropriate builder based on use_view. + let mut id_builder = if use_view { + TraceIdBuilder::Utf8View(StringViewBuilder::new()) + } else { + TraceIdBuilder::Utf8(StringBuilder::new()) + }; + let mut ts_builder = Int64Builder::new(); let gen_id = |rng: &mut rand::rngs::SmallRng| { rng.gen::<[u8; 16]>() @@ -230,10 +263,19 @@ pub(crate) fn make_data( Ok((schema, partitions)) } -/// The Schema used by make_data -fn test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("trace_id", DataType::Utf8, false), - Field::new("timestamp_ms", DataType::Int64, false), - ])) +/// Returns a Schema based on the use_view flag +fn test_schema(use_view: bool) -> SchemaRef { + if use_view { + // Return Utf8View schema + Arc::new(Schema::new(vec![ + Field::new("trace_id", DataType::Utf8View, false), + Field::new("timestamp_ms", DataType::Int64, false), + ])) + } else { + // Return regular Utf8 schema + Arc::new(Schema::new(vec![ + Field::new("trace_id", DataType::Utf8, false), + Field::new("timestamp_ms", DataType::Int64, false), + ])) + } } diff --git a/datafusion/core/benches/distinct_query_sql.rs b/datafusion/core/benches/distinct_query_sql.rs index 4992ae660766..c7056aab8689 100644 --- a/datafusion/core/benches/distinct_query_sql.rs +++ b/datafusion/core/benches/distinct_query_sql.rs @@ -133,7 +133,8 @@ pub async fn create_context_sampled_data( partition_cnt: i32, sample_cnt: i32, ) -> Result<(Arc, Arc)> { - let (schema, parts) = make_data(partition_cnt, sample_cnt, false /* asc */).unwrap(); + let (schema, parts) = + make_data(partition_cnt, sample_cnt, false /* asc */, false).unwrap(); let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap()); // Create the DataFrame diff --git a/datafusion/core/benches/topk_aggregate.rs b/datafusion/core/benches/topk_aggregate.rs index 777d586b344c..cf3c7fa2e26f 100644 --- a/datafusion/core/benches/topk_aggregate.rs +++ b/datafusion/core/benches/topk_aggregate.rs @@ -33,8 +33,9 @@ async fn create_context( sample_cnt: i32, asc: bool, use_topk: bool, + use_view: bool, ) -> Result<(Arc, Arc)> { - let (schema, parts) = make_data(partition_cnt, sample_cnt, asc).unwrap(); + let (schema, parts) = make_data(partition_cnt, sample_cnt, asc, use_view).unwrap(); let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap()); // Create the DataFrame @@ -108,7 +109,7 @@ fn criterion_benchmark(c: &mut Criterion) { |b| { b.iter(|| { let real = rt.block_on(async { - create_context(limit, partitions, samples, false, false) + create_context(limit, partitions, samples, false, false, false) .await .unwrap() }); @@ -122,7 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) { |b| { b.iter(|| { let asc = rt.block_on(async { - create_context(limit, partitions, samples, true, false) + create_context(limit, partitions, samples, true, false, false) .await .unwrap() }); @@ -140,7 +141,7 @@ fn criterion_benchmark(c: &mut Criterion) { |b| { b.iter(|| { let topk_real = rt.block_on(async { - create_context(limit, partitions, samples, false, true) + create_context(limit, partitions, samples, false, true, false) .await .unwrap() }); @@ -158,7 +159,45 @@ fn criterion_benchmark(c: &mut Criterion) { |b| { b.iter(|| { let topk_asc = rt.block_on(async { - create_context(limit, partitions, samples, true, true) + create_context(limit, partitions, samples, true, true, false) + .await + .unwrap() + }); + run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true) + }) + }, + ); + + // Utf8View schema,time-series rows + c.bench_function( + format!( + "top k={limit} aggregate {} time-series rows [Utf8View]", + partitions * samples + ) + .as_str(), + |b| { + b.iter(|| { + let topk_real = rt.block_on(async { + create_context(limit, partitions, samples, false, true, true) + .await + .unwrap() + }); + run(&rt, topk_real.0.clone(), topk_real.1.clone(), false) + }) + }, + ); + + // Utf8View schema,worst-case rows + c.bench_function( + format!( + "top k={limit} aggregate {} worst-case rows [Utf8View]", + partitions * samples + ) + .as_str(), + |b| { + b.iter(|| { + let topk_asc = rt.block_on(async { + create_context(limit, partitions, samples, true, true, true) .await .unwrap() });