From 3718039f6a69c04684cce0cd8b1d98d5a03cca89 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Wed, 5 Nov 2025 14:36:12 +0300 Subject: [PATCH 1/2] Save metrics during query execution --- .gitignore | 1 + crates/core-executor/src/error.rs | 8 + crates/core-executor/src/query.rs | 109 ++++++++++- crates/core-history/src/entities/mod.rs | 2 + .../src/entities/query_metrics.rs | 33 ++++ crates/core-history/src/errors.rs | 7 + crates/core-history/src/interface.rs | 4 +- .../core-history/src/sqlite_history_store.rs | 174 ++++++++++++++++-- crates/embucketd/src/cli.rs | 7 + crates/embucketd/src/main.rs | 1 + 10 files changed, 324 insertions(+), 22 deletions(-) create mode 100644 crates/core-history/src/entities/query_metrics.rs diff --git a/.gitignore b/.gitignore index b856da38f..6092d173a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ # will have compiled files and executables debug/ target/ +sqlite_data/ object_store/ test/dbt_integration_tests/dbt-gitlab/logs/ test/slt_errors_stats_embucket.csv diff --git a/crates/core-executor/src/error.rs b/crates/core-executor/src/error.rs index 78c1d728f..41fe59997 100644 --- a/crates/core-executor/src/error.rs +++ b/crates/core-executor/src/error.rs @@ -596,6 +596,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Query History metrics error: {source}"))] + QueryHistoryMetrics { + #[snafu(source(from(core_history::errors::Error, Box::new)))] + source: Box, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Query {} cancelled", query_id.as_uuid()))] QueryCancelled { query_id: QueryRecordId, diff --git a/crates/core-executor/src/query.rs b/crates/core-executor/src/query.rs index 6b1b0c406..92034b195 100644 --- a/crates/core-executor/src/query.rs +++ b/crates/core-executor/src/query.rs @@ -25,7 +25,8 @@ use crate::duckdb::query::{ }; use crate::error::{OperationOn, OperationType}; use crate::models::{QueryContext, QueryResult}; -use core_history::HistoryStore; +use core_history::query_metrics::QueryMetric; +use core_history::{HistoryStore, QueryRecordId}; use core_metastore::{ AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume, SchemaIdent as MetastoreSchemaIdent, TableCreateRequest as MetastoreTableCreateRequest, @@ -71,15 +72,15 @@ use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp}; use datafusion_expr::planner::ContextProvider; use datafusion_expr::{ BinaryExpr, CreateMemoryTable, DdlStatement, Expr as DFExpr, ExprSchemable, Extension, - JoinType, LogicalPlanBuilder, Operator, Projection, SubqueryAlias, TryCast, and, - build_join_schema, is_null, lit, when, + JoinType, LogicalPlanBuilder, Operator, Projection, SubqueryAlias, TryCast, + UserDefinedLogicalNode, and, build_join_schema, is_null, lit, when, }; use datafusion_iceberg::DataFusionTable; use datafusion_iceberg::catalog::catalog::IcebergCatalog; use datafusion_iceberg::catalog::mirror::Mirror; use datafusion_iceberg::catalog::schema::IcebergSchema; use datafusion_iceberg::table::DataFusionTableConfigBuilder; -use datafusion_physical_plan::collect; +use datafusion_physical_plan::{ExecutionPlan, collect, displayable}; use df_catalog::catalog::CachingCatalog; use df_catalog::catalog_list::CachedEntity; use df_catalog::table::CachingTable; @@ -92,7 +93,9 @@ use embucket_functions::visitors::{ table_functions_cte_relation, timestamp, top_limit, unimplemented::functions_checker::visit as unimplemented_functions_checker, }; +use futures::FutureExt; use futures::TryStreamExt; +use futures::future::join_all; use iceberg_rust::catalog::Catalog; use iceberg_rust::catalog::create::CreateTableBuilder; use iceberg_rust::catalog::identifier::Identifier; @@ -108,6 +111,7 @@ use iceberg_rust::spec::values::Value as IcebergValue; use iceberg_rust::table::manifest_list::snapshot_partition_bounds; use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey as S3Key, resolve_bucket_region}; use object_store::{ClientOptions, ObjectStore}; +use serde_json::json; use snafu::{OptionExt, ResultExt, location}; use sqlparser::ast::helpers::key_value_options::KeyValueOptions; use sqlparser::ast::helpers::stmt_data_loading::StageParamsObject; @@ -124,7 +128,12 @@ use std::ops::ControlFlow; use std::result::Result as StdResult; use std::str::FromStr; use std::sync::Arc; -use tracing::Instrument; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; +use tokio::task::AbortHandle; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, error}; use tracing_attributes::instrument; use url::Url; @@ -2394,6 +2403,7 @@ impl UserQuery { async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { let session = self.session.clone(); let query_id = self.query_context.query_id; + let history_store = Arc::clone(&self.history_store); let span = tracing::debug_span!("UserQuery::execute_logical_plan"); @@ -2406,11 +2416,42 @@ impl UserQuery { .ctx .execute_logical_plan(plan) .await - .context(ex_error::DataFusionSnafu)? - .collect() + .context(ex_error::DataFusionSnafu)?; + + let plan = records + .create_physical_plan() + .await + .context(ex_error::DataFusionSnafu)?; + + // Run background job to save physical plan metrics + let token = CancellationToken::new(); + let bg_token = token.clone(); + let metrics_plan = Arc::clone(&plan); + tokio::spawn(async move { + loop { + tokio::select! { + _ = sleep(Duration::from_secs(1)) => { + if let Err(e) = save_plan_metrics(history_store.clone(), query_id, &metrics_plan).await { + error!("Failed to save intermediate plan metrics: {:?}", e); + } + } + _ = bg_token.cancelled() => { + if let Err(e) = save_plan_metrics(history_store.clone(), query_id, &metrics_plan).await { + error!("Failed to save final plan metrics: {:?}", e); + } + break; + } + } + } + }); + + let task_ctx = session.ctx.task_ctx(); + let records = collect(plan, task_ctx) .instrument(span) .await .context(ex_error::DataFusionSnafu)?; + // Stop metrics background job + token.cancel(); if !records.is_empty() { schema = records[0].schema().as_ref().clone(); } @@ -3739,3 +3780,57 @@ fn normalize_resolved_ref(table_ref: &ResolvedTableReference) -> ResolvedTableRe table: Arc::from(table_ref.table.to_ascii_lowercase()), } } + +pub async fn save_plan_metrics( + history_store: Arc, + query_id: QueryRecordId, + plan: &Arc, +) -> Result<()> { + let counter = AtomicUsize::new(0); + let mut collected = Vec::new(); + + collect_plan_metrics(query_id, plan, None, 0, &counter, &mut collected); + + tracing::debug!( + "Collected {} metrics from plan, saving to metrics store...", + collected.len() + ); + history_store + .add_query_metrics_batch(&collected) + .await + .context(ex_error::QueryHistoryMetricsSnafu) +} + +/// Recursively collect metrics into a vector (non-async). +fn collect_plan_metrics( + query_id: QueryRecordId, + plan: &Arc, + parent: Option, + level: usize, + counter: &AtomicUsize, + out: &mut Vec, +) { + let node_id = counter.fetch_add(1, Ordering::SeqCst); + + let metrics_json = if let Some(metrics) = plan.metrics() { + json!({ + "spill_count": metrics.spill_count(), + "output_rows": metrics.output_rows(), + "elapsed_compute": metrics.elapsed_compute(), + }) + } else { + json!({}) + }; + + out.push(QueryMetric::new( + query_id, + node_id, + parent, + plan.name(), + metrics_json, + )); + + for child in plan.children() { + collect_plan_metrics(query_id, &child, Some(node_id), level + 1, counter, out); + } +} diff --git a/crates/core-history/src/entities/mod.rs b/crates/core-history/src/entities/mod.rs index c50bb4af6..75ef77263 100644 --- a/crates/core-history/src/entities/mod.rs +++ b/crates/core-history/src/entities/mod.rs @@ -1,6 +1,7 @@ pub mod query; pub mod query_id; pub mod query_id_param; +pub mod query_metrics; pub mod result_set; pub mod worksheet; pub mod worksheet_query_ref; @@ -8,6 +9,7 @@ pub mod worksheet_query_ref; pub use query::*; pub use query_id::*; pub use query_id_param::*; +pub use query_metrics::*; pub use result_set::*; pub use worksheet::*; pub use worksheet_query_ref::*; diff --git a/crates/core-history/src/entities/query_metrics.rs b/crates/core-history/src/entities/query_metrics.rs new file mode 100644 index 000000000..21ca07b39 --- /dev/null +++ b/crates/core-history/src/entities/query_metrics.rs @@ -0,0 +1,33 @@ +use crate::QueryRecordId; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QueryMetric { + pub query_id: QueryRecordId, + pub node_id: usize, + pub parent_node_id: Option, + pub operator: String, + pub metrics: serde_json::Value, // serialized metrics as JSON object + pub created_at: DateTime, +} + +impl QueryMetric { + #[must_use] + pub fn new( + query_id: QueryRecordId, + node_id: usize, + parent_node_id: Option, + operator: &str, + metrics: serde_json::Value, + ) -> Self { + Self { + query_id, + node_id, + parent_node_id, + operator: operator.to_string(), + metrics, + created_at: Utc::now(), + } + } +} diff --git a/crates/core-history/src/errors.rs b/crates/core-history/src/errors.rs index a1e84dbaa..4b5b29aa7 100644 --- a/crates/core-history/src/errors.rs +++ b/crates/core-history/src/errors.rs @@ -76,6 +76,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Error adding query metrics: {source}"))] + MetricsAdd { + source: core_utils::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Error adding query record: {source}"))] QueryAdd { source: core_utils::Error, diff --git a/crates/core-history/src/interface.rs b/crates/core-history/src/interface.rs index d2dccb39a..ac7fda34e 100644 --- a/crates/core-history/src/interface.rs +++ b/crates/core-history/src/interface.rs @@ -1,6 +1,6 @@ use crate::ResultSet; use crate::errors::Result; -use crate::{QueryRecord, QueryRecordId, QueryStatus, Worksheet, WorksheetId}; +use crate::{QueryMetric, QueryRecord, QueryRecordId, QueryStatus, Worksheet, WorksheetId}; use async_trait::async_trait; #[derive(Debug, Clone)] @@ -84,4 +84,6 @@ pub trait HistoryStore: std::fmt::Debug + Send + Sync { fn new_query_record(&self, query: &str, worksheet_id: Option) -> QueryRecord; async fn save_query_record(&self, query_record: &QueryRecord, result_set: Option); async fn get_query_result(&self, query_record_id: QueryRecordId) -> Result; + async fn add_query_metrics(&self, metric: &QueryMetric) -> Result<()>; + async fn add_query_metrics_batch(&self, metrics: &[QueryMetric]) -> Result<()>; } diff --git a/crates/core-history/src/sqlite_history_store.rs b/crates/core-history/src/sqlite_history_store.rs index f7faf74c4..0855cc008 100644 --- a/crates/core-history/src/sqlite_history_store.rs +++ b/crates/core-history/src/sqlite_history_store.rs @@ -1,6 +1,7 @@ use crate::ResultSet; use crate::errors::{self as history_err, Result}; use crate::interface::{GetQueriesParams, HistoryStore}; +use crate::query_metrics::QueryMetric; use crate::{QueryRecord, QueryRecordId, QueryStatus, Worksheet, WorksheetId}; use async_trait::async_trait; use bytes::Bytes; @@ -45,6 +46,17 @@ CREATE TABLE IF NOT EXISTS queries ( FOREIGN KEY (worksheet_id) REFERENCES worksheets (id) ON DELETE SET NULL );"; +const METRICS_CREATE_TABLE: &str = " +CREATE TABLE IF NOT EXISTS metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, -- unique metric entry ID + query_id TEXT NOT NULL, -- UUID of the query this metric belongs to + node_id INTEGER NOT NULL, -- node identifier within the execution plan + parent_node_id INTEGER, -- optional parent node + operator TEXT NOT NULL, -- operator name (e.g. HashJoinExec, FilterExec) + metrics JSON NOT NULL, -- metrics serialized as JSON + created_at TEXT NOT NULL -- stored as ISO8601 timestamp +);"; + const WORKSHEET_ADD: &str = " INSERT INTO worksheets (id, name, content, created_at, updated_at) VALUES (:id, :name, :content, :created_at, :updated_at); @@ -53,6 +65,7 @@ INSERT INTO worksheets (id, name, content, created_at, updated_at) pub struct SlateDBHistoryStore { pub queries_db: SqliteDb, pub results_db: SqliteDb, + pub metrics_db: SqliteDb, } impl std::fmt::Debug for SlateDBHistoryStore { @@ -67,13 +80,13 @@ impl SlateDBHistoryStore { db: core_utils::Db, history_db_name: String, results_db_name: String, + metrics_db_name: String, ) -> Result { // try creating dirs for every separate db file - if let Some(dir_path) = std::path::Path::new(&history_db_name).parent() { - std::fs::create_dir_all(dir_path).context(history_err::CreateDirSnafu)?; - } - if let Some(dir_path) = std::path::Path::new(&results_db_name).parent() { - std::fs::create_dir_all(dir_path).context(history_err::CreateDirSnafu)?; + for path in [&history_db_name, &results_db_name, &metrics_db_name] { + if let Some(dir_path) = std::path::Path::new(path).parent() { + std::fs::create_dir_all(dir_path).context(history_err::CreateDirSnafu)?; + } } let history_store = Self { @@ -83,6 +96,9 @@ impl SlateDBHistoryStore { results_db: SqliteDb::new(db.slate_db(), &results_db_name) .await .expect("Failed to initialize sqlite store"), + metrics_db: SqliteDb::new(db.slate_db(), &metrics_db_name) + .await + .expect("Failed to initialize sqlite store"), }; history_store.create_tables().await?; Ok(history_store) @@ -100,6 +116,8 @@ impl SlateDBHistoryStore { .map_or("", |s| s.split("::").last().unwrap_or("")); let queries_db_name = format!("file:{thread_name}?mode=memory"); let results_db_name = format!("file:{thread_name}_r?mode=memory"); + let metrics_db_name = format!("file:{thread_name}_m?mode=memory"); + let store = Self { queries_db: SqliteDb::new(utils_db.slate_db(), &queries_db_name) .await @@ -107,6 +125,9 @@ impl SlateDBHistoryStore { results_db: SqliteDb::new(utils_db.slate_db(), &results_db_name) .await .expect("Failed to create SqliteDb for results"), + metrics_db: SqliteDb::new(utils_db.slate_db(), &metrics_db_name) + .await + .expect("Failed to create SqliteDb for results"), }; store .create_tables() @@ -137,6 +158,12 @@ impl SlateDBHistoryStore { .await .context(core_utils_err::CoreSqliteSnafu) .context(history_err::CoreUtilsSnafu)?; + let metrics_connection = self + .metrics_db + .conn() + .await + .context(core_utils_err::CoreSqliteSnafu) + .context(history_err::CoreUtilsSnafu)?; let result = tokio::try_join!( queries_connection.interact(|conn| -> SqlResult { @@ -147,9 +174,12 @@ impl SlateDBHistoryStore { }), results_connection .interact(|conn| -> SqlResult { conn.execute(RESULTS_CREATE_TABLE, []) }), + metrics_connection + .interact(|conn| -> SqlResult { conn.execute(METRICS_CREATE_TABLE, []) }), )?; result.0.context(history_err::CreateTablesSnafu)?; result.1.context(history_err::CreateTablesSnafu)?; + result.2.context(history_err::CreateTablesSnafu)?; tracing::Span::current().record("ok", true); Ok(()) @@ -374,7 +404,7 @@ impl HistoryStore for SlateDBHistoryStore { "INSERT INTO queries ( id, worksheet_id, - result_id, + result_id, query, start_time, end_time, @@ -386,7 +416,7 @@ impl HistoryStore for SlateDBHistoryStore { VALUES ( :id, :worksheet_id, - :result_id, + :result_id, :query, :start_time, :end_time, @@ -462,14 +492,14 @@ impl HistoryStore for SlateDBHistoryStore { let update_future = queries_conn.interact(move |conn| -> SqlResult { conn.execute( - "UPDATE queries SET - status = :status, - end_time = :end_time, - duration_ms = :duration_ms, - result_count = :result_count, + "UPDATE queries SET + status = :status, + end_time = :end_time, + duration_ms = :duration_ms, + result_count = :result_count, result_id = :result_id, - error = :error, - diagnostic_error = :diagnostic_error + error = :error, + diagnostic_error = :diagnostic_error WHERE id = :id", named_params! { ":status": q_status, @@ -790,6 +820,122 @@ impl HistoryStore for SlateDBHistoryStore { ResultSet::try_from(raw_result) } + + #[instrument( + name = "SqliteHistoryStore::add_query_metrics", + level = "debug", + skip(self, metric), + fields(ok, metric = format!("{metric:#?}")), + err + )] + async fn add_query_metrics(&self, metric: &QueryMetric) -> Result<()> { + let conn = self + .metrics_db + .conn() + .await + .context(core_utils_err::CoreSqliteSnafu) + .context(history_err::MetricsAddSnafu)?; + + let m = metric.clone(); + + conn.interact(move |conn| -> SqlResult { + conn.execute( + "INSERT INTO metrics ( + query_id, + node_id, + parent_node_id, + operator, + metrics, + created_at + ) VALUES ( + :query_id, + :node_id, + :parent_node_id, + :operator, + :metrics, + :created_at + )", + named_params! { + ":query_id": m.query_id.to_string(), + ":node_id": m.node_id, + ":parent_node_id": m.parent_node_id, + ":operator": m.operator, + ":metrics": serde_json::to_string(&m.metrics).unwrap_or_else(|_| "{}".to_string()), + ":created_at": m.created_at.to_rfc3339(), + }, + ) + }) + .await? + .context(core_utils_err::RuSqliteSnafu) + .context(history_err::MetricsAddSnafu)?; + tracing::Span::current().record("ok", true); + Ok(()) + } + + #[instrument( + name = "SqliteHistoryStore::add_query_metrics_batch", + level = "debug", + skip(self, metrics), + fields(count = metrics.len()), + err + )] + async fn add_query_metrics_batch(&self, metrics: &[QueryMetric]) -> Result<()> { + if metrics.is_empty() { + return Ok(()); + } + let conn = self + .metrics_db + .conn() + .await + .context(core_utils_err::CoreSqliteSnafu) + .context(history_err::MetricsAddSnafu)?; + + let batch = metrics.to_vec(); + + conn.interact(move |conn| -> SqlResult<()> { + let tx = conn.transaction()?; + + { + let mut stmt = tx.prepare( + "INSERT INTO metrics ( + query_id, + node_id, + parent_node_id, + operator, + metrics, + created_at + ) VALUES ( + :query_id, + :node_id, + :parent_node_id, + :operator, + :metrics, + :created_at + )", + )?; + + for m in &batch { + stmt.execute(named_params! { + ":query_id": m.query_id.to_string(), + ":node_id": m.node_id, + ":parent_node_id": m.parent_node_id, + ":operator": m.operator, + ":metrics": serde_json::to_string(&m.metrics).unwrap_or_else(|_| "{}".to_string()), + ":created_at": m.created_at.to_rfc3339(), + })?; + } + } + + tx.commit()?; + Ok(()) + }) + .await? + .context(core_utils_err::RuSqliteSnafu) + .context(history_err::MetricsAddSnafu)?; + + tracing::Span::current().record("ok", true); + Ok(()) + } } fn parse_query_record_id(id: &str) -> SqlResult { diff --git a/crates/embucketd/src/cli.rs b/crates/embucketd/src/cli.rs index 5889d15bf..868269427 100644 --- a/crates/embucketd/src/cli.rs +++ b/crates/embucketd/src/cli.rs @@ -248,6 +248,13 @@ pub struct CliOpts { )] pub query_results_db_name: String, + #[arg( + long, + env = "QUERY_METRICS_DB_NAME", + default_value = "sqlite_data/metrics.db" + )] + pub query_metrics_db_name: String, + // should unset JWT_SECRET env var after loading #[arg( long, diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index e99bdb6e1..87536688e 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -208,6 +208,7 @@ async fn async_main( db.clone(), opts.query_history_db_name.clone(), opts.query_results_db_name.clone(), + opts.query_metrics_db_name.clone(), ) .await?, ); From d24663dcb748f56606e7cc3cd6a0ca861c0d65e2 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Wed, 5 Nov 2025 16:18:10 +0300 Subject: [PATCH 2/2] clippy --- crates/core-executor/src/query.rs | 18 +++++++----------- crates/core-metastore/Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/crates/core-executor/src/query.rs b/crates/core-executor/src/query.rs index 92034b195..4da0fa605 100644 --- a/crates/core-executor/src/query.rs +++ b/crates/core-executor/src/query.rs @@ -72,15 +72,15 @@ use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp}; use datafusion_expr::planner::ContextProvider; use datafusion_expr::{ BinaryExpr, CreateMemoryTable, DdlStatement, Expr as DFExpr, ExprSchemable, Extension, - JoinType, LogicalPlanBuilder, Operator, Projection, SubqueryAlias, TryCast, - UserDefinedLogicalNode, and, build_join_schema, is_null, lit, when, + JoinType, LogicalPlanBuilder, Operator, Projection, SubqueryAlias, TryCast, and, + build_join_schema, is_null, lit, when, }; use datafusion_iceberg::DataFusionTable; use datafusion_iceberg::catalog::catalog::IcebergCatalog; use datafusion_iceberg::catalog::mirror::Mirror; use datafusion_iceberg::catalog::schema::IcebergSchema; use datafusion_iceberg::table::DataFusionTableConfigBuilder; -use datafusion_physical_plan::{ExecutionPlan, collect, displayable}; +use datafusion_physical_plan::{ExecutionPlan, collect}; use df_catalog::catalog::CachingCatalog; use df_catalog::catalog_list::CachedEntity; use df_catalog::table::CachingTable; @@ -93,9 +93,7 @@ use embucket_functions::visitors::{ table_functions_cte_relation, timestamp, top_limit, unimplemented::functions_checker::visit as unimplemented_functions_checker, }; -use futures::FutureExt; use futures::TryStreamExt; -use futures::future::join_all; use iceberg_rust::catalog::Catalog; use iceberg_rust::catalog::create::CreateTableBuilder; use iceberg_rust::catalog::identifier::Identifier; @@ -130,7 +128,6 @@ use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; -use tokio::task::AbortHandle; use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::{Instrument, error}; @@ -2430,12 +2427,12 @@ impl UserQuery { tokio::spawn(async move { loop { tokio::select! { - _ = sleep(Duration::from_secs(1)) => { + () = sleep(Duration::from_secs(1)) => { if let Err(e) = save_plan_metrics(history_store.clone(), query_id, &metrics_plan).await { error!("Failed to save intermediate plan metrics: {:?}", e); } } - _ = bg_token.cancelled() => { + () = bg_token.cancelled() => { if let Err(e) = save_plan_metrics(history_store.clone(), query_id, &metrics_plan).await { error!("Failed to save final plan metrics: {:?}", e); } @@ -3789,7 +3786,7 @@ pub async fn save_plan_metrics( let counter = AtomicUsize::new(0); let mut collected = Vec::new(); - collect_plan_metrics(query_id, plan, None, 0, &counter, &mut collected); + collect_plan_metrics(query_id, plan, None, &counter, &mut collected); tracing::debug!( "Collected {} metrics from plan, saving to metrics store...", @@ -3806,7 +3803,6 @@ fn collect_plan_metrics( query_id: QueryRecordId, plan: &Arc, parent: Option, - level: usize, counter: &AtomicUsize, out: &mut Vec, ) { @@ -3831,6 +3827,6 @@ fn collect_plan_metrics( )); for child in plan.children() { - collect_plan_metrics(query_id, &child, Some(node_id), level + 1, counter, out); + collect_plan_metrics(query_id, child, Some(node_id), counter, out); } } diff --git a/crates/core-metastore/Cargo.toml b/crates/core-metastore/Cargo.toml index 562298bb4..e8e5c55e4 100644 --- a/crates/core-metastore/Cargo.toml +++ b/crates/core-metastore/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "core-metastore" version = "0.1.0" -edition = "2021" +edition = "2024" license-file = { workspace = true } [dependencies]