Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# will have compiled files and executables
debug/
target/
sqlite_data/
object_store/
test/dbt_integration_tests/dbt-gitlab/logs/
test/slt_errors_stats_embucket.csv
Expand Down
8 changes: 8 additions & 0 deletions crates/core-executor/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Query History metrics error: {source}"))]
QueryHistoryMetrics {
#[snafu(source(from(core_history::errors::Error, Box::new)))]
source: Box<core_history::errors::Error>,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Query {} cancelled", query_id.as_uuid()))]
QueryCancelled {
query_id: QueryRecordId,
Expand Down
101 changes: 96 additions & 5 deletions crates/core-executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::duckdb::query::{
};
use crate::error::{OperationOn, OperationType};
use crate::models::{QueryContext, QueryResult};
use core_history::HistoryStore;
use core_history::query_metrics::QueryMetric;
use core_history::{HistoryStore, QueryRecordId};
use core_metastore::{
AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume,
SchemaIdent as MetastoreSchemaIdent, TableCreateRequest as MetastoreTableCreateRequest,
Expand Down Expand Up @@ -79,7 +80,7 @@ use datafusion_iceberg::catalog::catalog::IcebergCatalog;
use datafusion_iceberg::catalog::mirror::Mirror;
use datafusion_iceberg::catalog::schema::IcebergSchema;
use datafusion_iceberg::table::DataFusionTableConfigBuilder;
use datafusion_physical_plan::collect;
use datafusion_physical_plan::{ExecutionPlan, collect};
use df_catalog::catalog::CachingCatalog;
use df_catalog::catalog_list::CachedEntity;
use df_catalog::table::CachingTable;
Expand Down Expand Up @@ -108,6 +109,7 @@ use iceberg_rust::spec::values::Value as IcebergValue;
use iceberg_rust::table::manifest_list::snapshot_partition_bounds;
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey as S3Key, resolve_bucket_region};
use object_store::{ClientOptions, ObjectStore};
use serde_json::json;
use snafu::{OptionExt, ResultExt, location};
use sqlparser::ast::helpers::key_value_options::KeyValueOptions;
use sqlparser::ast::helpers::stmt_data_loading::StageParamsObject;
Expand All @@ -124,7 +126,11 @@ use std::ops::ControlFlow;
use std::result::Result as StdResult;
use std::str::FromStr;
use std::sync::Arc;
use tracing::Instrument;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error};
use tracing_attributes::instrument;
use url::Url;

Expand Down Expand Up @@ -2394,6 +2400,7 @@ impl UserQuery {
async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<QueryResult> {
let session = self.session.clone();
let query_id = self.query_context.query_id;
let history_store = Arc::clone(&self.history_store);

let span = tracing::debug_span!("UserQuery::execute_logical_plan");

Expand All @@ -2406,11 +2413,42 @@ impl UserQuery {
.ctx
.execute_logical_plan(plan)
.await
.context(ex_error::DataFusionSnafu)?
.collect()
.context(ex_error::DataFusionSnafu)?;

let plan = records
.create_physical_plan()
.await
.context(ex_error::DataFusionSnafu)?;

// Run background job to save physical plan metrics
let token = CancellationToken::new();
let bg_token = token.clone();
let metrics_plan = Arc::clone(&plan);
tokio::spawn(async move {
loop {
tokio::select! {
() = sleep(Duration::from_secs(1)) => {
if let Err(e) = save_plan_metrics(history_store.clone(), query_id, &metrics_plan).await {
error!("Failed to save intermediate plan metrics: {:?}", e);
}
}
() = bg_token.cancelled() => {
if let Err(e) = save_plan_metrics(history_store.clone(), query_id, &metrics_plan).await {
error!("Failed to save final plan metrics: {:?}", e);
}
break;
}
}
}
});

let task_ctx = session.ctx.task_ctx();
let records = collect(plan, task_ctx)
.instrument(span)
.await
.context(ex_error::DataFusionSnafu)?;
// Stop metrics background job
token.cancel();
if !records.is_empty() {
schema = records[0].schema().as_ref().clone();
}
Expand Down Expand Up @@ -3739,3 +3777,56 @@ fn normalize_resolved_ref(table_ref: &ResolvedTableReference) -> ResolvedTableRe
table: Arc::from(table_ref.table.to_ascii_lowercase()),
}
}

pub async fn save_plan_metrics(
history_store: Arc<dyn HistoryStore>,
query_id: QueryRecordId,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<()> {
let counter = AtomicUsize::new(0);
let mut collected = Vec::new();

collect_plan_metrics(query_id, plan, None, &counter, &mut collected);

tracing::debug!(
"Collected {} metrics from plan, saving to metrics store...",
collected.len()
);
history_store
.add_query_metrics_batch(&collected)
.await
.context(ex_error::QueryHistoryMetricsSnafu)
}

/// Recursively collect metrics into a vector (non-async).
fn collect_plan_metrics(
query_id: QueryRecordId,
plan: &Arc<dyn ExecutionPlan>,
parent: Option<usize>,
counter: &AtomicUsize,
out: &mut Vec<QueryMetric>,
) {
let node_id = counter.fetch_add(1, Ordering::SeqCst);

let metrics_json = if let Some(metrics) = plan.metrics() {
json!({
"spill_count": metrics.spill_count(),
"output_rows": metrics.output_rows(),
"elapsed_compute": metrics.elapsed_compute(),
})
} else {
json!({})
};

out.push(QueryMetric::new(
query_id,
node_id,
parent,
plan.name(),
metrics_json,
));

for child in plan.children() {
collect_plan_metrics(query_id, child, Some(node_id), counter, out);
}
}
2 changes: 2 additions & 0 deletions crates/core-history/src/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
pub mod query;
pub mod query_id;
pub mod query_id_param;
pub mod query_metrics;
pub mod result_set;
pub mod worksheet;
pub mod worksheet_query_ref;

pub use query::*;
pub use query_id::*;
pub use query_id_param::*;
pub use query_metrics::*;
pub use result_set::*;
pub use worksheet::*;
pub use worksheet_query_ref::*;
33 changes: 33 additions & 0 deletions crates/core-history/src/entities/query_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::QueryRecordId;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryMetric {
pub query_id: QueryRecordId,
pub node_id: usize,
pub parent_node_id: Option<usize>,
pub operator: String,
pub metrics: serde_json::Value, // serialized metrics as JSON object
pub created_at: DateTime<Utc>,
}

impl QueryMetric {
#[must_use]
pub fn new(
query_id: QueryRecordId,
node_id: usize,
parent_node_id: Option<usize>,
operator: &str,
metrics: serde_json::Value,
) -> Self {
Self {
query_id,
node_id,
parent_node_id,
operator: operator.to_string(),
metrics,
created_at: Utc::now(),
}
}
}
7 changes: 7 additions & 0 deletions crates/core-history/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Error adding query metrics: {source}"))]
MetricsAdd {
source: core_utils::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Error adding query record: {source}"))]
QueryAdd {
source: core_utils::Error,
Expand Down
4 changes: 3 additions & 1 deletion crates/core-history/src/interface.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::ResultSet;
use crate::errors::Result;
use crate::{QueryRecord, QueryRecordId, QueryStatus, Worksheet, WorksheetId};
use crate::{QueryMetric, QueryRecord, QueryRecordId, QueryStatus, Worksheet, WorksheetId};
use async_trait::async_trait;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -84,4 +84,6 @@ pub trait HistoryStore: std::fmt::Debug + Send + Sync {
fn new_query_record(&self, query: &str, worksheet_id: Option<WorksheetId>) -> QueryRecord;
async fn save_query_record(&self, query_record: &QueryRecord, result_set: Option<ResultSet>);
async fn get_query_result(&self, query_record_id: QueryRecordId) -> Result<ResultSet>;
async fn add_query_metrics(&self, metric: &QueryMetric) -> Result<()>;
async fn add_query_metrics_batch(&self, metrics: &[QueryMetric]) -> Result<()>;
}
Loading
Loading