From d37d8981a6a37a92e6cd137aa09ef974c154922f Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 2 Sep 2022 18:42:50 +0530 Subject: [PATCH] Use existing schema when querying local data. Datafusion needs a valid schema to execute a query. If no associated schema found for registered table then datafusion tries to infer that schema. If there are no available listings from which schema can be derived then it fails and returns an error. This causes entire query to fail and return error response. This is fixed by giving it a proper schema to work with that is already part of metadata. There are however ways that this schema is might not be available ( maybe first event has happened yet), then we simply return doing no changes to RecordBatch. Changes to be committed: modified: server/src/query.rs --- server/src/query.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/query.rs b/server/src/query.rs index 1bdc44180..09fb6df54 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -17,6 +17,7 @@ */ use chrono::{DateTime, Utc}; +use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; @@ -24,6 +25,7 @@ use datafusion::prelude::*; use serde_json::Value; use std::sync::Arc; +use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage; use crate::storage::ObjectStorage; @@ -92,6 +94,14 @@ impl Query { target_partitions: 1, }; + let schema = &STREAM_INFO.schema(&self.stream_name)?; + + if schema.is_empty() { + return Ok(()); + } + + let schema: Arc = Arc::new(serde_json::from_str(schema)?); + ctx.register_listing_table( &self.stream_name, CONFIG @@ -100,7 +110,7 @@ impl Query { .to_str() .unwrap(), listing_options, - None, + Some(schema), ) .await?;