Skip to content

Commit 13a661b

Browse files
committed
explanatory comments
1 parent f36ae17 commit 13a661b

File tree

4 files changed

+22
-9
lines changed

4 files changed

+22
-9
lines changed

server/Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ arrow-array = { version = "53.0.0" }
1515
arrow-json = "53.0.0"
1616
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
1717
arrow-select = "53.0.0"
18-
# datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
1918
datafusion = "42.0.0"
20-
object_store = { version = "0.11.0", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
19+
object_store = { version = "0.11.0", features = ["cloud", "aws"] }
2120
parquet = "53.0.0"
2221
arrow-flight = { version = "53.0.0", features = [ "tls" ] }
23-
tonic = {version = "0.12.1", features = ["tls", "transport", "gzip", "zstd"] }
24-
tonic-web = "0.12.1"
22+
tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
23+
tonic-web = "0.12.3"
2524
tower-http = { version = "0.6.1", features = ["cors"] }
2625

2726
### actix dependencies

server/src/cli.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,9 @@ impl Cli {
421421
.help("Set a fixed memory limit for query"),
422422
)
423423
.arg(
424+
// RowGroupSize controls the number of rows present in one row group
425+
// More rows = better compression but HIGHER Memory consumption during read/write
426+
// 1048576 is the default value for DataFusion
424427
Arg::new(Self::ROW_GROUP_SIZE)
425428
.long(Self::ROW_GROUP_SIZE)
426429
.env("P_PARQUET_ROW_GROUP_SIZE")

server/src/query.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,20 @@ impl Query {
8686
.with_prefer_existing_sort(true)
8787
.with_round_robin_repartition(true);
8888

89+
// For more details refer https://datafusion.apache.org/user-guide/configs.html
90+
91+
// Reduce the number of rows read (if possible)
8992
config.options_mut().execution.parquet.enable_page_index = true;
93+
94+
// Pushdown filters allows DF to push the filters as far down in the plan as possible
95+
// and thus, reducing the number of rows decoded
9096
config.options_mut().execution.parquet.pushdown_filters = true;
97+
98+
// Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation
9199
config.options_mut().execution.parquet.reorder_filters = true;
100+
101+
// Enable StringViewArray
102+
// https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/
92103
config
93104
.options_mut()
94105
.execution

server/src/query/stream_schema_provider.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,9 @@ async fn collect_from_snapshot(
216216

217217
fn partitioned_files(
218218
manifest_files: Vec<catalog::manifest::File>,
219-
table_schema: &Schema,
220-
target_partition: usize,
219+
table_schema: &Schema
221220
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
221+
let target_partition = num_cpus::get();
222222
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
223223
let mut column_statistics = HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
224224
let mut count = 0;
@@ -436,7 +436,7 @@ impl TableProvider for StandardTableProvider {
436436
);
437437
}
438438

439-
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1);
439+
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema);
440440
let remote_exec = create_parquet_physical_plan(
441441
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
442442
partitioned_files,
@@ -520,7 +520,7 @@ async fn get_cache_exectuion_plan(
520520
})
521521
.collect();
522522

523-
let (partitioned_files, statistics) = partitioned_files(cached, &schema, 1);
523+
let (partitioned_files, statistics) = partitioned_files(cached, &schema);
524524
let plan = create_parquet_physical_plan(
525525
ObjectStoreUrl::parse("file:///").unwrap(),
526526
partitioned_files,
@@ -571,7 +571,7 @@ async fn get_hottier_exectuion_plan(
571571
})
572572
.collect();
573573

574-
let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema, 1);
574+
let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema);
575575
let plan = create_parquet_physical_plan(
576576
ObjectStoreUrl::parse("file:///").unwrap(),
577577
partitioned_files,

0 commit comments

Comments
 (0)