diff --git a/server/src/query.rs b/server/src/query.rs index 1bdc44180..09fb6df54 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -17,6 +17,7 @@ */ use chrono::{DateTime, Utc}; +use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; @@ -24,6 +25,7 @@ use datafusion::prelude::*; use serde_json::Value; use std::sync::Arc; +use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage; use crate::storage::ObjectStorage; @@ -92,6 +94,14 @@ impl Query { target_partitions: 1, }; + let schema = &STREAM_INFO.schema(&self.stream_name)?; + + if schema.is_empty() { + return Ok(()); + } + + let schema: Arc = Arc::new(serde_json::from_str(schema)?); + ctx.register_listing_table( &self.stream_name, CONFIG @@ -100,7 +110,7 @@ impl Query { .to_str() .unwrap(), listing_options, - None, + Some(schema), ) .await?;