diff --git a/server/Cargo.toml b/server/Cargo.toml
index 27523eac4..ab175a7c9 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -17,7 +17,7 @@ aws-sdk-s3 = "0.19"
aws-smithy-async = { version = "0.49.0", features = ["rt-tokio"] }
base64 = "0.20.0"
bytes = "1"
-chrono = "0.4.19"
+chrono = "0.4.23"
chrono-humanize = "0.2.2"
clap = { version = "4.0.32", default-features = false, features = [
"std",
@@ -49,7 +49,7 @@ rustls = "0.20.6"
rustls-pemfile = "1.0.1"
rust-flatten-json = "0.2.0"
semver = "1.0.14"
-serde = "^1.0.8"
+serde = { version = "1.0", features = ["rc"] }
serde_derive = "^1.0.8"
serde_json = "^1.0.8"
thiserror = "1"
@@ -65,6 +65,9 @@ actix-web-static-files = "4.0"
static-files = "0.2.1"
ulid = { version = "1.0", features = ["serde"] }
ureq = { version = "2.5.0", features = ["json"] }
+hex = "0.4.3"
+itertools = "0.10.5"
+xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
[build-dependencies]
static-files = "0.2.1"
@@ -77,7 +80,6 @@ zip = { version = "0.6.3", default_features = false, features = ["deflate"] }
[dev-dependencies]
maplit = "1.0.2"
rstest = "0.15.0"
-serial_test = { version = "0.9.0", default-features = false }
[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.0.8/build.zip"
diff --git a/server/src/event.rs b/server/src/event.rs
index 1dcd10bf0..0941e6a0f 100644
--- a/server/src/event.rs
+++ b/server/src/event.rs
@@ -1,224 +1,77 @@
/*
- * Parseable Server (C) 2022 - 2023 Parseable, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- *
- */
-use actix_web::rt::spawn;
+* Parseable Server (C) 2022 - 2023 Parseable, Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see .
+*
+*
+*/
+
+mod writer;
+
use arrow_schema::{DataType, Field, TimeUnit};
-use chrono::{DateTime, Utc};
+use chrono::Utc;
use datafusion::arrow::array::{Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::error::ArrowError;
-use datafusion::arrow::ipc::writer::StreamWriter;
-
use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions};
use datafusion::arrow::record_batch::RecordBatch;
-use lazy_static::lazy_static;
use serde_json::Value;
+
use std::collections::HashMap;
-use std::fs::OpenOptions;
-use std::ops::{Deref, DerefMut};
+use std::ops::DerefMut;
use std::sync::Arc;
-use std::sync::Mutex;
-use std::sync::MutexGuard;
-use std::sync::RwLock;
use crate::metadata;
use crate::metadata::LOCK_EXPECT;
use crate::option::CONFIG;
-use crate::storage::StorageDir;
-use self::error::{EventError, StreamWriterError};
-
-type LocalWriter = Mutex>>;
-type LocalWriterGuard<'a> = MutexGuard<'a, Option>>;
+use self::error::EventError;
+pub use self::writer::STREAM_WRITERS;
const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
-const TIME_KEYS: &[&str] = &["time", "date", "datetime", "timestamp"];
-
-lazy_static! {
- #[derive(Default)]
- pub static ref STREAM_WRITERS: RwLock> = RwLock::new(HashMap::new());
-}
-
-impl STREAM_WRITERS {
- // append to a existing stream
- fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), StreamWriterError> {
- let hashmap_guard = STREAM_WRITERS
- .read()
- .map_err(|_| StreamWriterError::RwPoisoned)?;
-
- match hashmap_guard.get(stream) {
- Some(localwriter) => {
- let mut writer_guard = localwriter
- .lock()
- .map_err(|_| StreamWriterError::MutexPoisoned)?;
-
- // if it's some writer then we write without dropping any lock
- // hashmap cannot be brought mutably at any point until this finishes
- if let Some(ref mut writer) = *writer_guard {
- writer.write(record).map_err(StreamWriterError::Writer)?;
- } else {
- // pass on this mutex to set entry so that it can be reused
- // we have a guard for underlying entry thus
- // hashmap must not be availible as mutable to any other thread
- STREAM_WRITERS::set_entry(writer_guard, stream, record)?;
- }
- }
- // entry is not present thus we create it
- None => {
- // this requires mutable borrow of the map so we drop this read lock and wait for write lock
- drop(hashmap_guard);
- STREAM_WRITERS::create_entry(stream.to_string(), record)?;
- }
- };
- Ok(())
- }
-
- // create a new entry with new stream_writer
- // Only create entry for valid streams
- fn create_entry(stream: String, record: &RecordBatch) -> Result<(), StreamWriterError> {
- let mut hashmap_guard = STREAM_WRITERS
- .write()
- .map_err(|_| StreamWriterError::RwPoisoned)?;
-
- let stream_writer = init_new_stream_writer_file(&stream, record)?;
-
- hashmap_guard.insert(stream, Mutex::new(Some(stream_writer)));
-
- Ok(())
- }
-
- // Deleting a logstream requires that metadata is deleted first
- pub fn delete_entry(stream: &str) -> Result<(), StreamWriterError> {
- let mut hashmap_guard = STREAM_WRITERS
- .write()
- .map_err(|_| StreamWriterError::RwPoisoned)?;
-
- hashmap_guard.remove(stream);
-
- Ok(())
- }
-
- fn set_entry(
- mut writer_guard: LocalWriterGuard,
- stream: &str,
- record: &RecordBatch,
- ) -> Result<(), StreamWriterError> {
- let stream_writer = init_new_stream_writer_file(stream, record)?;
-
- writer_guard.replace(stream_writer); // replace the stream writer behind this mutex
-
- Ok(())
- }
-
- pub fn unset_all() -> Result<(), StreamWriterError> {
- let map = STREAM_WRITERS
- .read()
- .map_err(|_| StreamWriterError::RwPoisoned)?;
-
- for writer in map.values() {
- if let Some(mut streamwriter) = writer
- .lock()
- .map_err(|_| StreamWriterError::MutexPoisoned)?
- .take()
- {
- let _ = streamwriter.finish();
- }
- }
-
- Ok(())
- }
-}
-
-fn init_new_stream_writer_file(
- stream_name: &str,
- record: &RecordBatch,
-) -> Result, StreamWriterError> {
- let dir = StorageDir::new(stream_name);
- let path = dir.path_by_current_time();
-
- std::fs::create_dir_all(dir.data_path)?;
-
- let file = OpenOptions::new().create(true).append(true).open(path)?;
-
- let mut stream_writer = StreamWriter::try_new(file, &record.schema())
- .expect("File and RecordBatch both are checked");
-
- stream_writer
- .write(record)
- .map_err(StreamWriterError::Writer)?;
-
- Ok(stream_writer)
-}
#[derive(Clone)]
pub struct Event {
pub body: Value,
pub stream_name: String,
+ pub schema_key: String,
}
// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(self) -> Result<(), EventError> {
- let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
+ let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name, &self.schema_key)?;
if let Some(schema) = stream_schema {
- let schema_ref = Arc::new(schema);
// validate schema before processing the event
- let Ok(mut event) = self.get_record(schema_ref.clone()) else {
+ let Ok(mut event) = self.get_record(Arc::clone(&schema)) else {
return Err(EventError::SchemaMismatch);
};
- if event
- .schema()
- .column_with_name(DEFAULT_TIMESTAMP_KEY)
- .is_some()
- {
- let rows = event.num_rows();
- let timestamp_array = Arc::new(get_timestamp_array(rows));
- event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array);
- }
+ let rows = event.num_rows();
+ let timestamp_array = Arc::new(get_timestamp_array(rows));
+ event = replace(schema, event, DEFAULT_TIMESTAMP_KEY, timestamp_array);
self.process_event(&event)?;
} else {
// if stream schema is none then it is first event,
// process first event and store schema in obect store
- // check for a possible datetime field
- let time_field = get_datetime_field(&self.body);
-
- if time_field.is_none() {
- let schema = Schema::try_merge(vec![
- Schema::new(vec![Field::new(
- DEFAULT_TIMESTAMP_KEY,
- DataType::Timestamp(TimeUnit::Millisecond, None),
- true,
- )]),
- self.infer_schema()?,
- ])?;
- let schema_ref = Arc::new(schema.clone());
- let event = self.get_record(schema_ref.clone())?;
- let timestamp_array = Arc::new(get_timestamp_array(event.num_rows()));
- let event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array);
- self.process_first_event(&event, schema)?;
- } else {
- let schema = self.infer_schema()?;
- let schema_ref = Arc::new(schema.clone());
- let event = self.get_record(schema_ref)?;
- self.process_first_event(&event, schema)?;
- }
+ let schema = add_default_timestamp_field(self.infer_schema()?)?;
+ let schema_ref = Arc::new(schema.clone());
+ let event = self.get_record(schema_ref.clone())?;
+ let timestamp_array = Arc::new(get_timestamp_array(event.num_rows()));
+ let event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array);
+ self.process_first_event(&event, schema)?;
};
metadata::STREAM_INFO.update_stats(
@@ -246,60 +99,16 @@ impl Event {
// - map always have an entry for this stream
let stream_name = &self.stream_name;
+ let schema_key = &self.schema_key;
- let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT);
- // if the metadata is not none after acquiring lock
- // then some other thread has already completed this function.
- if _schema_with_map(stream_name, &stream_metadata).is_some() {
- // drop the lock
- drop(stream_metadata);
- // Try to post event usual way
- log::info!("first event is redirected to process_event");
- self.process_event(event)
- } else {
- // stream metadata is still none,
- // this means this execution should be considered as first event.
-
- // Store record batch on local cache
- log::info!("creating local writer for this first event");
- self.process_event(event)?;
-
- log::info!("schema is set in memory map for logstream {}", stream_name);
- _set_schema_with_map(stream_name, schema.clone(), &mut stream_metadata);
- // drop mutex before going across await point
- drop(stream_metadata);
-
- log::info!(
- "setting schema on objectstore for logstream {}",
- stream_name
- );
- let storage = CONFIG.storage().get_object_store();
-
- let stream_name = stream_name.clone();
- spawn(async move {
- if let Err(e) = storage.put_schema(&stream_name, &schema).await {
- // If this call has failed then currently there is no right way to make local state consistent
- // this needs a fix after more constraints are safety guarentee is provided by localwriter and objectstore_sync.
- // Reasoning -
- // - After dropping lock many events may process through
- // - Processed events may sync before metadata deletion
- log::error!(
- "Parseable failed to upload schema to objectstore due to error {}",
- e
- );
- log::error!("Please manually delete this logstream and create a new one.");
- metadata::STREAM_INFO.delete_stream(&stream_name);
- }
- });
-
- Ok(())
- }
+ commit_schema(stream_name, schema_key, Arc::new(schema))?;
+ self.process_event(event)
}
// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(&self, rb: &RecordBatch) -> Result<(), EventError> {
- STREAM_WRITERS::append_to_local(&self.stream_name, rb)?;
+ STREAM_WRITERS::append_to_local(&self.stream_name, &self.schema_key, rb)?;
Ok(())
}
@@ -321,6 +130,30 @@ impl Event {
}
}
+fn add_default_timestamp_field(schema: Schema) -> Result {
+ let schema = Schema::try_merge(vec![
+ Schema::new(vec![Field::new(
+ DEFAULT_TIMESTAMP_KEY,
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ true,
+ )]),
+ schema,
+ ])?;
+
+ Ok(schema)
+}
+
+pub fn get_schema_key(body: &Value) -> String {
+ let mut list_of_fields: Vec<_> = body.as_object().unwrap().keys().collect();
+ list_of_fields.sort();
+ let mut hasher = xxhash_rust::xxh3::Xxh3::new();
+ for field in list_of_fields {
+ hasher.update(field.as_bytes())
+ }
+ let hash = hasher.digest();
+ format!("{hash:x}")
+}
+
fn fields_mismatch(schema: &Schema, body: &Value) -> bool {
for (name, val) in body.as_object().expect("body is of object variant") {
let Ok(field) = schema.field_with_name(name) else { return true };
@@ -345,13 +178,58 @@ fn fields_mismatch(schema: &Schema, body: &Value) -> bool {
false
}
+fn commit_schema(
+ stream_name: &str,
+ schema_key: &str,
+ schema: Arc,
+) -> Result<(), EventError> {
+ // note for methods .get_unchecked and .set_unchecked,
+ // these are to be called while holding a write lock specifically.
+ // this guarantees two things
+ // - no other metadata operation can happen in between
+ // - map always have an entry for this stream
+
+ let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT);
+ // if the metadata is not none after acquiring lock
+ // then some other thread has already completed this function.
+ if stream_metadata
+ .get_unchecked(stream_name, schema_key)
+ .is_some()
+ {
+ // drop the lock
+ drop(stream_metadata);
+ // Nothing to do
+ Ok(())
+ } else {
+ // set to map
+ stream_metadata.set_unchecked(stream_name, schema_key, schema);
+ // serialize map
+ let schema_map = serde_json::to_string(
+ &stream_metadata
+ .get(stream_name)
+ .expect("map has entry for this stream name")
+ .schema,
+ )
+ .expect("map of schemas is serializable");
+ // try to put to storage
+ let storage = CONFIG.storage().get_object_store();
+ let res = futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map));
+ // revert if err
+ if res.is_err() {
+ stream_metadata.remove_unchecked(stream_name, schema_key)
+ }
+ // return result
+ res.map_err(|err| err.into())
+ }
+}
+
fn replace(
schema: Arc,
batch: RecordBatch,
column: &str,
arr: Arc,
) -> RecordBatch {
- let index = schema.column_with_name(column).unwrap().0;
+ let (index, _) = schema.column_with_name(column).unwrap();
let mut arrays = batch.columns().to_vec();
arrays[index] = arr;
@@ -363,50 +241,41 @@ fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
TimestampMillisecondArray::from_value(time.timestamp_millis(), size)
}
-fn get_datetime_field(json: &Value) -> Option<&str> {
- let Value::Object(object) = json else { panic!() };
- for (key, value) in object {
- if TIME_KEYS.contains(&key.as_str()) {
- if let Value::String(maybe_datetime) = value {
- if DateTime::parse_from_rfc3339(maybe_datetime).is_ok() {
- return Some(key);
- }
- }
- }
+trait UncheckedOp: DerefMut> {
+ fn get_unchecked(&self, stream_name: &str, schema_key: &str) -> Option> {
+ self.get(stream_name)
+ .expect("map has entry for this stream name")
+ .schema
+ .get(schema_key)
+ .cloned()
}
- None
-}
-// Special functions which reads from metadata map while holding the lock
-#[inline]
-pub fn _schema_with_map(
- stream_name: &str,
- map: &impl Deref>,
-) -> Option {
- map.get(stream_name)
- .expect("map has entry for this stream name")
- .schema
- .to_owned()
-}
+ fn set_unchecked(&mut self, stream_name: &str, schema_key: &str, schema: Arc) {
+ self.get_mut(stream_name)
+ .expect("map has entry for this stream name")
+ .schema
+ .insert(schema_key.to_string(), schema)
+ .is_some()
+ .then(|| panic!("collision"));
+ }
-#[inline]
-// Special functions which writes to metadata map while holding the lock
-pub fn _set_schema_with_map(
- stream_name: &str,
- schema: Schema,
- map: &mut impl DerefMut>,
-) {
- map.get_mut(stream_name)
- .expect("map has entry for this stream name")
- .schema
- .replace(schema);
+ fn remove_unchecked(&mut self, stream_name: &str, schema_key: &str) {
+ self.get_mut(stream_name)
+ .expect("map has entry for this stream name")
+ .schema
+ .remove(schema_key);
+ }
}
+impl>> UncheckedOp for T {}
+
pub mod error {
use crate::metadata::error::stream_info::MetadataError;
use crate::storage::ObjectStorageError;
use datafusion::arrow::error::ArrowError;
+ use super::writer::errors::StreamWriterError;
+
#[derive(Debug, thiserror::Error)]
pub enum EventError {
#[error("Missing Record from event body")]
@@ -422,18 +291,6 @@ pub mod error {
#[error("Schema Mismatch: {0}")]
ObjectStorage(#[from] ObjectStorageError),
}
-
- #[derive(Debug, thiserror::Error)]
- pub enum StreamWriterError {
- #[error("Arrow writer failed: {0}")]
- Writer(#[from] ArrowError),
- #[error("Io Error when creating new file: {0}")]
- Io(#[from] std::io::Error),
- #[error("RwLock was poisoned")]
- RwPoisoned,
- #[error("Mutex was poisoned")]
- MutexPoisoned,
- }
}
#[cfg(test)]
diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs
new file mode 100644
index 000000000..c69832207
--- /dev/null
+++ b/server/src/event/writer.rs
@@ -0,0 +1,204 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ */
+
+use datafusion::arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
+use lazy_static::lazy_static;
+use std::borrow::Borrow;
+use std::collections::HashMap;
+use std::fs::{File, OpenOptions};
+use std::io::Write;
+use std::sync::{Mutex, RwLock};
+
+use crate::storage::StorageDir;
+
+use self::errors::StreamWriterError;
+
+type ArrowWriter = StreamWriter;
+type LocalWriter = Mutex>>;
+
+lazy_static! {
+ #[derive(Default)]
+ pub static ref STREAM_WRITERS: RwLock> = RwLock::new(WriterTable::new());
+}
+
+impl STREAM_WRITERS {
+ // append to a existing stream
+ pub fn append_to_local(
+ stream: &str,
+ schema_key: &str,
+ record: &RecordBatch,
+ ) -> Result<(), StreamWriterError> {
+ let hashmap_guard = STREAM_WRITERS
+ .read()
+ .map_err(|_| StreamWriterError::RwPoisoned)?;
+
+ match hashmap_guard.get(stream, schema_key) {
+ Some(localwriter) => {
+ let mut writer_guard = localwriter
+ .lock()
+ .map_err(|_| StreamWriterError::MutexPoisoned)?;
+
+ // if it's some writer then we write without dropping any lock
+ // hashmap cannot be brought mutably at any point until this finishes
+ if let Some(ref mut writer) = *writer_guard {
+ writer.write(record).map_err(StreamWriterError::Writer)?;
+ } else {
+ // pass on this mutex to set entry so that it can be reused
+ // we have a guard for underlying entry thus
+ // hashmap must not be availible as mutable to any other thread
+ let writer = init_new_stream_writer_file(stream, schema_key, record)?;
+ writer_guard.replace(writer); // replace the stream writer behind this mutex
+ }
+ }
+ // entry is not present thus we create it
+ None => {
+ // this requires mutable borrow of the map so we drop this read lock and wait for write lock
+ drop(hashmap_guard);
+ STREAM_WRITERS::create_entry(stream.to_owned(), schema_key.to_owned(), record)?;
+ }
+ };
+ Ok(())
+ }
+
+ // create a new entry with new stream_writer
+ // Only create entry for valid streams
+ fn create_entry(
+ stream: String,
+ schema_key: String,
+ record: &RecordBatch,
+ ) -> Result<(), StreamWriterError> {
+ let mut hashmap_guard = STREAM_WRITERS
+ .write()
+ .map_err(|_| StreamWriterError::RwPoisoned)?;
+
+ let writer = init_new_stream_writer_file(&stream, &schema_key, record)?;
+
+ hashmap_guard.insert(stream, schema_key, Mutex::new(Some(writer)));
+
+ Ok(())
+ }
+
+ pub fn delete_stream(stream: &str) {
+ STREAM_WRITERS.write().unwrap().delete_stream(stream);
+ }
+
+ pub fn unset_all() -> Result<(), StreamWriterError> {
+ let table = STREAM_WRITERS
+ .read()
+ .map_err(|_| StreamWriterError::RwPoisoned)?;
+
+ for writer in table.iter() {
+ if let Some(mut streamwriter) = writer
+ .lock()
+ .map_err(|_| StreamWriterError::MutexPoisoned)?
+ .take()
+ {
+ let _ = streamwriter.finish();
+ }
+ }
+
+ Ok(())
+ }
+}
+
+pub struct WriterTable
+where
+ A: Eq + std::hash::Hash,
+ B: Eq + std::hash::Hash,
+ T: Write,
+{
+ table: HashMap >>,
+}
+
+impl WriterTable
+where
+ A: Eq + std::hash::Hash,
+ B: Eq + std::hash::Hash,
+ T: Write,
+{
+ pub fn new() -> Self {
+ let table = HashMap::new();
+ Self { table }
+ }
+
+ fn get(&self, a: &X, b: &Y) -> Option<&LocalWriter>
+ where
+ A: Borrow,
+ B: Borrow,
+ X: Eq + std::hash::Hash + ?Sized,
+ Y: Eq + std::hash::Hash + ?Sized,
+ {
+ self.table.get(a)?.get(b)
+ }
+
+ fn insert(&mut self, a: A, b: B, v: LocalWriter) {
+ let inner = self.table.entry(a).or_default();
+ inner.insert(b, v);
+ }
+
+ pub fn delete_stream(&mut self, stream: &X)
+ where
+ A: Borrow,
+ X: Eq + std::hash::Hash + ?Sized,
+ {
+ self.table.remove(stream);
+ }
+
+ fn iter(&self) -> impl Iterator- > {
+ self.table.values().flat_map(|inner| inner.values())
+ }
+}
+
+fn init_new_stream_writer_file(
+ stream_name: &str,
+ schema_key: &str,
+ record: &RecordBatch,
+) -> Result
, StreamWriterError> {
+ let dir = StorageDir::new(stream_name);
+ let path = dir.path_by_current_time(schema_key);
+
+ std::fs::create_dir_all(dir.data_path)?;
+
+ let file = OpenOptions::new().create(true).append(true).open(path)?;
+
+ let mut stream_writer = StreamWriter::try_new(file, &record.schema())
+ .expect("File and RecordBatch both are checked");
+
+ stream_writer
+ .write(record)
+ .map_err(StreamWriterError::Writer)?;
+
+ Ok(stream_writer)
+}
+
+pub mod errors {
+ use arrow_schema::ArrowError;
+
+ #[derive(Debug, thiserror::Error)]
+ pub enum StreamWriterError {
+ #[error("Arrow writer failed: {0}")]
+ Writer(#[from] ArrowError),
+ #[error("Io Error when creating new file: {0}")]
+ Io(#[from] std::io::Error),
+ #[error("RwLock was poisoned")]
+ RwPoisoned,
+ #[error("Mutex was poisoned")]
+ MutexPoisoned,
+ }
+}
diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs
index af85d6f60..6daf8dbd1 100644
--- a/server/src/handlers/event.rs
+++ b/server/src/handlers/event.rs
@@ -24,7 +24,7 @@ use crate::option::CONFIG;
use crate::query::Query;
use crate::response::QueryResponse;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
-use crate::utils::{flatten_json_body, merge};
+use crate::utils::json::{flatten_json_body, merge};
use self::error::{PostError, QueryError};
@@ -39,7 +39,7 @@ pub async fn query(_req: HttpRequest, json: web::Json) -> Result::into)
@@ -103,10 +103,12 @@ async fn push_logs(
for mut body in array {
merge(&mut body, tags_n_metadata.clone().into_iter());
let body = flatten_json_body(&body).unwrap();
+ let schema_key = event::get_schema_key(&body);
let event = event::Event {
body,
stream_name: stream_name.clone(),
+ schema_key,
};
event.process().await?;
@@ -114,10 +116,12 @@ async fn push_logs(
}
mut body @ Value::Object(_) => {
merge(&mut body, tags_n_metadata.into_iter());
-
+ let body = flatten_json_body(&body).unwrap();
+ let schema_key = event::get_schema_key(&body);
let event = event::Event {
- body: flatten_json_body(&body).unwrap(),
+ body,
stream_name,
+ schema_key,
};
event.process().await?;
diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs
index aad2de257..57345c09e 100644
--- a/server/src/handlers/logstream.rs
+++ b/server/src/handlers/logstream.rs
@@ -25,8 +25,9 @@ use serde_json::Value;
use crate::alerts::Alerts;
use crate::event;
+use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
-use crate::storage::{ObjectStorageError, StorageDir};
+use crate::storage::{LogStream, StorageDir};
use crate::{metadata, validator};
use self::error::StreamError;
@@ -37,19 +38,13 @@ pub async fn delete(req: HttpRequest) -> Result {
let objectstore = CONFIG.storage().get_object_store();
- if objectstore.get_schema(&stream_name).await.is_err() {
+ if !objectstore.stream_exists(&stream_name).await? {
return Err(StreamError::StreamNotFound(stream_name.to_string()));
}
objectstore.delete_stream(&stream_name).await?;
metadata::STREAM_INFO.delete_stream(&stream_name);
-
- if event::STREAM_WRITERS::delete_entry(&stream_name).is_err() {
- log::warn!(
- "failed to delete log stream event writers for stream {}",
- stream_name
- )
- }
+ event::STREAM_WRITERS::delete_stream(&stream_name);
let stream_dir = StorageDir::new(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
@@ -64,32 +59,19 @@ pub async fn delete(req: HttpRequest) -> Result {
}
pub async fn list(_: HttpRequest) -> impl Responder {
- let body = CONFIG
- .storage()
- .get_object_store()
+ let res: Vec = STREAM_INFO
.list_streams()
- .await
- .unwrap();
- web::Json(body)
+ .into_iter()
+ .map(|stream| LogStream { name: stream })
+ .collect();
+
+ web::Json(res)
}
pub async fn schema(req: HttpRequest) -> Result {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
-
- match metadata::STREAM_INFO.schema(&stream_name) {
- Ok(schema) => Ok((web::Json(schema), StatusCode::OK)),
- Err(_) => match CONFIG
- .storage()
- .get_object_store()
- .get_schema(&stream_name)
- .await
- {
- Ok(Some(schema)) => Ok((web::Json(Some(schema)), StatusCode::OK)),
- Ok(None) => Err(StreamError::UninitializedLogstream),
- Err(ObjectStorageError::NoSuchKey(_)) => Err(StreamError::StreamNotFound(stream_name)),
- Err(err) => Err(err.into()),
- },
- }
+ let schema = STREAM_INFO.merged_schema(&stream_name)?;
+ Ok((web::Json(schema), StatusCode::OK))
}
pub async fn get_alert(req: HttpRequest) -> Result {
@@ -164,19 +146,15 @@ pub async fn put_alert(
validator::alert(&alerts)?;
- match metadata::STREAM_INFO.schema(&stream_name) {
- Ok(Some(schema)) => {
- let invalid_alert = alerts
- .alerts
- .iter()
- .find(|alert| !alert.rule.valid_for_schema(&schema));
+ if !STREAM_INFO.stream_initialized(&stream_name)? {
+ return Err(StreamError::UninitializedLogstream);
+ }
- if let Some(alert) = invalid_alert {
- return Err(StreamError::InvalidAlert(alert.name.to_string()));
- }
+ let schema = STREAM_INFO.merged_schema(&stream_name)?;
+ for alert in &alerts.alerts {
+ if !alert.rule.valid_for_schema(&schema) {
+ return Err(StreamError::InvalidAlert(alert.name.to_owned()));
}
- Ok(None) => return Err(StreamError::UninitializedLogstream),
- Err(_) => return Err(StreamError::StreamNotFound(stream_name)),
}
CONFIG
@@ -255,7 +233,7 @@ pub async fn create_stream(stream_name: String) -> Result<(), StreamError> {
status: StatusCode::INTERNAL_SERVER_ERROR,
});
}
- metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default());
+ metadata::STREAM_INFO.add_stream(stream_name.to_string());
Ok(())
}
@@ -266,6 +244,7 @@ pub mod error {
use http::StatusCode;
use crate::{
+ metadata::error::stream_info::MetadataError,
storage::ObjectStorageError,
validator::error::{AlertValidationError, StreamNameValidationError},
};
@@ -316,4 +295,12 @@ pub mod error {
.body(self.to_string())
}
}
+
+ impl From for StreamError {
+ fn from(value: MetadataError) -> Self {
+ match value {
+ MetadataError::StreamMetaNotFound(s) => StreamError::StreamNotFound(s),
+ }
+ }
+ }
}
diff --git a/server/src/main.rs b/server/src/main.rs
index 4332fc256..e59291331 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -26,24 +26,25 @@ use clokwerk::{AsyncScheduler, Scheduler, TimeUnits};
use log::warn;
use rustls::{Certificate, PrivateKey, ServerConfig};
use rustls_pemfile::{certs, pkcs8_private_keys};
-use std::env;
use thread_priority::{ThreadBuilder, ThreadPriority};
+use tokio::sync::oneshot;
+use tokio::sync::oneshot::error::TryRecvError;
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
+use std::env;
use std::fs::File;
use std::io::BufReader;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::thread::{self, JoinHandle};
use std::time::Duration;
-use tokio::sync::oneshot;
-use tokio::sync::oneshot::error::TryRecvError;
mod alerts;
mod banner;
mod event;
mod handlers;
mod metadata;
+mod migration;
mod option;
mod query;
mod response;
@@ -54,7 +55,6 @@ mod validator;
use option::CONFIG;
-// Global configurations
const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
const API_BASE_PATH: &str = "/api";
const API_VERSION: &str = "v1";
@@ -69,9 +69,12 @@ async fn main() -> anyhow::Result<()> {
let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, metadata);
+ migration::run_migration(&CONFIG).await?;
+
if let Err(e) = metadata::STREAM_INFO.load(&*storage).await {
warn!("could not populate local metadata. {:?}", e);
}
+
// track all parquet files already in the data directory
storage::CACHED_FILES.track_parquet();
diff --git a/server/src/metadata.rs b/server/src/metadata.rs
index 075755bbd..ee4184215 100644
--- a/server/src/metadata.rs
+++ b/server/src/metadata.rs
@@ -19,7 +19,7 @@
use datafusion::arrow::datatypes::Schema;
use lazy_static::lazy_static;
use std::collections::HashMap;
-use std::sync::RwLock;
+use std::sync::{Arc, RwLock};
use crate::alerts::Alerts;
use crate::event::Event;
@@ -28,13 +28,7 @@ use crate::storage::ObjectStorage;
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
-#[derive(Debug, Default)]
-pub struct LogStreamMetadata {
- pub schema: Option,
- pub alerts: Alerts,
- pub stats: StatsCounter,
-}
-
+// TODO: make return type be of 'static lifetime instead of cloning
lazy_static! {
#[derive(Debug)]
// A read-write lock to allow multiple reads while and isolated write
@@ -42,6 +36,13 @@ lazy_static! {
RwLock::new(HashMap::new());
}
+#[derive(Debug, Default)]
+pub struct LogStreamMetadata {
+ pub schema: HashMap>,
+ pub alerts: Alerts,
+ pub stats: StatsCounter,
+}
+
// It is very unlikely that panic will occur when dealing with metadata.
pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding a lock";
@@ -68,26 +69,52 @@ impl STREAM_INFO {
Ok(())
}
- #[allow(dead_code)]
- pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), MetadataError> {
- let mut map = self.write().expect(LOCK_EXPECT);
- map.get_mut(stream_name)
- .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
- .map(|metadata| {
- metadata.schema.replace(schema);
- })
- }
-
pub fn stream_exists(&self, stream_name: &str) -> bool {
let map = self.read().expect(LOCK_EXPECT);
map.contains_key(stream_name)
}
- pub fn schema(&self, stream_name: &str) -> Result, MetadataError> {
+ pub fn stream_initialized(&self, stream_name: &str) -> Result {
+ Ok(!self.schema_map(stream_name)?.is_empty())
+ }
+
+ pub fn schema(
+ &self,
+ stream_name: &str,
+ schema_key: &str,
+ ) -> Result>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
- map.get(stream_name)
+ let schemas = map
+ .get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
- .map(|metadata| metadata.schema.to_owned())
+ .map(|metadata| &metadata.schema)?;
+
+ Ok(schemas.get(schema_key).cloned())
+ }
+
+ pub fn schema_map(
+ &self,
+ stream_name: &str,
+ ) -> Result>, MetadataError> {
+ let map = self.read().expect(LOCK_EXPECT);
+ let schemas = map
+ .get(stream_name)
+ .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
+ .map(|metadata| metadata.schema.clone())?;
+
+ Ok(schemas)
+ }
+
+ pub fn merged_schema(&self, stream_name: &str) -> Result {
+ let map = self.read().expect(LOCK_EXPECT);
+ let schemas = &map
+ .get(stream_name)
+ .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?
+ .schema;
+ let schema = Schema::try_merge(schemas.values().map(|schema| &**schema).cloned())
+ .expect("mergeable schemas");
+
+ Ok(schema)
}
pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> {
@@ -99,11 +126,9 @@ impl STREAM_INFO {
})
}
- pub fn add_stream(&self, stream_name: String, schema: Option, alerts: Alerts) {
+ pub fn add_stream(&self, stream_name: String) {
let mut map = self.write().expect(LOCK_EXPECT);
let metadata = LogStreamMetadata {
- schema,
- alerts,
..Default::default()
};
map.insert(stream_name, metadata);
@@ -122,7 +147,7 @@ impl STREAM_INFO {
for stream in storage.list_streams().await? {
let alerts = storage.get_alerts(&stream.name).await?;
- let schema = storage.get_schema(&stream.name).await?;
+ let schema = storage.get_schema_map(&stream.name).await?;
let stats = storage.get_stats(&stream.name).await?;
let metadata = LogStreamMetadata {
diff --git a/server/src/migration.rs b/server/src/migration.rs
new file mode 100644
index 000000000..67f38670a
--- /dev/null
+++ b/server/src/migration.rs
@@ -0,0 +1,72 @@
+/*
+* Parseable Server (C) 2022 - 2023 Parseable, Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see .
+*
+*
+*/
+
+mod schema_migration;
+mod stream_metadata_migration;
+
+use bytes::Bytes;
+use relative_path::RelativePathBuf;
+use serde::Serialize;
+
+use crate::{option::Config, storage::ObjectStorage};
+
+pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
+ let storage = config.storage().get_object_store();
+ let streams = storage.list_streams().await?;
+
+ for stream in streams {
+ migration_stream(&stream.name, &*storage).await?
+ }
+
+ Ok(())
+}
+
+async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> {
+ let path = RelativePathBuf::from_iter([stream, ".stream.json"]);
+ let stream_metadata = storage.get_object(&path).await?;
+ let stream_metadata: serde_json::Value =
+ serde_json::from_slice(&stream_metadata).expect("stream.json is valid json");
+
+ let maybe_v1 = stream_metadata
+ .as_object()
+ .and_then(|meta| meta.get("version"))
+ .and_then(|version| version.as_str());
+
+ if matches!(maybe_v1, Some("v1")) {
+ let new_stream_metadata = stream_metadata_migration::v1_v2(stream_metadata);
+ storage
+ .put_object(&path, to_bytes(&new_stream_metadata))
+ .await?;
+
+ let schema_path = RelativePathBuf::from_iter([stream, ".schema"]);
+ let schema = storage.get_object(&schema_path).await?;
+ let schema = serde_json::from_slice(&schema).ok();
+ let map = schema_migration::v1_v2(schema)?;
+ storage.put_object(&schema_path, to_bytes(&map)).await?;
+ }
+
+ Ok(())
+}
+
+#[inline(always)]
+fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
+ serde_json::to_vec(any)
+ .map(|any| any.into())
+ .expect("serialize cannot fail")
+}
diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs
new file mode 100644
index 000000000..18a301ec3
--- /dev/null
+++ b/server/src/migration/schema_migration.rs
@@ -0,0 +1,52 @@
+/*
+* Parseable Server (C) 2022 - 2023 Parseable, Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see .
+*
+*
+*/
+
+use std::collections::HashMap;
+
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use itertools::Itertools;
+
+pub(super) fn v1_v2(schema: Option) -> anyhow::Result> {
+ let Some(schema) = schema else { return Ok(HashMap::new()) };
+ let schema = Schema::try_merge(vec![
+ Schema::new(vec![Field::new(
+ "p_timestamp",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ true,
+ )]),
+ schema,
+ ])?;
+
+ let list_of_fields = schema
+ .fields()
+ .iter()
+ // skip p_timestamp
+ .skip(1)
+ .map(|f| f.name())
+ .sorted();
+
+ let mut hasher = xxhash_rust::xxh3::Xxh3::new();
+ list_of_fields.for_each(|field| hasher.update(field.as_bytes()));
+ let hash = hasher.digest();
+ let key = format!("{hash:x}");
+
+ let mut map = HashMap::new();
+ map.insert(key, schema);
+ Ok(map)
+}
diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs
new file mode 100644
index 000000000..386056482
--- /dev/null
+++ b/server/src/migration/stream_metadata_migration.rs
@@ -0,0 +1,33 @@
+/*
+* Parseable Server (C) 2022 - 2023 Parseable, Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see .
+*
+*
+*/
+
+use serde_json::{json, Value};
+
+pub fn v1_v2(mut stream_metadata: Value) -> Value {
+ let default_stats = json!({
+ "events": 0,
+ "ingestion": 0,
+ "storage": 0
+ });
+ let stream_metadata_map = stream_metadata.as_object_mut().unwrap();
+ stream_metadata_map.entry("stats").or_insert(default_stats);
+ stream_metadata_map.insert("version".to_owned(), Value::String("v2".into()));
+ stream_metadata_map.insert("objectstore-format".to_owned(), Value::String("v2".into()));
+ stream_metadata
+}
diff --git a/server/src/query.rs b/server/src/query.rs
index b6c5ae20b..abb25d1fb 100644
--- a/server/src/query.rs
+++ b/server/src/query.rs
@@ -24,10 +24,11 @@ use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
+use itertools::Itertools;
use serde_json::Value;
+use std::collections::hash_map::RandomState;
use std::collections::HashSet;
-use std::path::Path;
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::option::CONFIG;
@@ -49,7 +50,7 @@ fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
pub struct Query {
pub query: String,
pub stream_name: String,
- pub schema: Arc,
+ pub merged_schema: Arc,
pub start: DateTime,
pub end: DateTime,
}
@@ -67,38 +68,45 @@ impl Query {
}
/// Return prefixes, each per day/hour/minutes as necessary
+ fn _get_prefixes(&self) -> Vec {
+ TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY).generate_prefixes()
+ }
+
pub fn get_prefixes(&self) -> Vec {
- TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY)
- .generate_prefixes(&self.stream_name)
+ self._get_prefixes()
+ .into_iter()
+ .map(|key| format!("{}/{}", self.stream_name, key))
+ .collect()
+ }
+
+ pub fn get_schema(&self) -> &Schema {
+ &self.merged_schema
}
/// Execute query on object storage(and if necessary on cache as well) with given stream information
/// TODO: find a way to query all selected parquet files together in a single context.
pub async fn execute(
&self,
- storage: &(impl ObjectStorage + ?Sized),
+ storage: Arc,
) -> Result, ExecuteError> {
let dir = StorageDir::new(&self.stream_name);
-
// take a look at local dir and figure out what local cache we could use for this query
- let arrow_files: Vec = dir
- .arrow_files()
+ let staging_arrows = dir
+ .arrow_files_grouped_by_time()
.into_iter()
- .filter(|path| path_intersects_query(path, self.start, self.end))
- .collect();
+ .filter(|(path, _)| path_intersects_query(path, self.start, self.end))
+ .sorted_by(|(a, _), (b, _)| Ord::cmp(a, b))
+ .collect_vec();
- let possible_parquet_files = arrow_files.iter().cloned().map(|mut path| {
- path.set_extension("parquet");
- path
- });
+ let staging_parquet_set: HashSet<&PathBuf, RandomState> =
+ HashSet::from_iter(staging_arrows.iter().map(|(p, _)| p));
- let parquet_files = dir
+ let other_staging_parquet = dir
.parquet_files()
.into_iter()
- .filter(|path| path_intersects_query(path, self.start, self.end));
-
- let parquet_files: HashSet = possible_parquet_files.chain(parquet_files).collect();
- let parquet_files = Vec::from_iter(parquet_files.into_iter());
+ .filter(|path| path_intersects_query(path, self.start, self.end))
+ .filter(|path| !staging_parquet_set.contains(path))
+ .collect_vec();
let ctx = SessionContext::with_config_rt(
SessionConfig::default(),
@@ -106,11 +114,13 @@ impl Query {
);
let table = Arc::new(QueryTableProvider::new(
- arrow_files,
- parquet_files,
- storage.query_table(self)?,
- Arc::clone(&self.schema),
+ staging_arrows,
+ other_staging_parquet,
+ self.get_prefixes(),
+ storage,
+ Arc::new(self.get_schema().clone()),
));
+
ctx.register_table(
&*self.stream_name,
Arc::clone(&table) as Arc,
@@ -135,10 +145,21 @@ fn time_from_path(path: &Path) -> DateTime {
.to_str()
.expect("filename is valid");
- // substring of filename i.e date=xxxx.hour=xx.minute=xx
- let prefix = &prefix[..33];
- Utc.datetime_from_str(prefix, "date=%F.hour=%H.minute=%M")
- .expect("valid prefix is parsed")
+ // Next three in order will be date, hour and minute
+ let mut components = prefix.splitn(3, '.');
+
+ let date = components.next().expect("date=xxxx-xx-xx");
+ let hour = components.next().expect("hour=xx");
+ let minute = components.next().expect("minute=xx");
+
+ let year = date[5..9].parse().unwrap();
+ let month = date[10..12].parse().unwrap();
+ let day = date[13..15].parse().unwrap();
+ let hour = hour[5..7].parse().unwrap();
+ let minute = minute[7..9].parse().unwrap();
+
+ Utc.with_ymd_and_hms(year, month, day, hour, minute, 0)
+ .unwrap()
}
pub mod error {
@@ -173,14 +194,8 @@ pub mod error {
#[cfg(test)]
mod tests {
- use super::{time_from_path, Query};
- use crate::{alerts::Alerts, metadata::STREAM_INFO};
- use datafusion::arrow::datatypes::Schema;
- use datafusion::arrow::datatypes::{DataType, Field};
- use rstest::*;
- use serde_json::Value;
+ use super::time_from_path;
use std::path::PathBuf;
- use std::str::FromStr;
#[test]
fn test_time_from_parquet_path() {
@@ -188,65 +203,4 @@ mod tests {
let time = time_from_path(path.as_path());
assert_eq!(time.timestamp(), 1640995200);
}
-
- // Query prefix generation tests
- #[fixture]
- fn schema() -> Schema {
- let field_a = Field::new("a", DataType::Int64, false);
- let field_b = Field::new("b", DataType::Boolean, false);
- Schema::new(vec![field_a, field_b])
- }
-
- fn clear_map() {
- STREAM_INFO.write().unwrap().clear();
- }
-
- // A query can only be performed on streams with a valid schema
- #[rstest]
- #[case(
- r#"{
- "query": "SELECT * FROM stream_name",
- "startTime": "2022-10-15T10:00:00+00:00",
- "endTime": "2022-10-15T10:01:00+00:00"
- }"#,
- &["stream_name/date=2022-10-15/hour=10/minute=00/"]
- )]
- #[case(
- r#"{
- "query": "SELECT * FROM stream_name",
- "startTime": "2022-10-15T10:00:00+00:00",
- "endTime": "2022-10-15T10:02:00+00:00"
- }"#,
- &["stream_name/date=2022-10-15/hour=10/minute=00/", "stream_name/date=2022-10-15/hour=10/minute=01/"]
- )]
- #[serial_test::serial]
- fn query_parse_prefix_with_some_schema(#[case] prefix: &str, #[case] right: &[&str]) {
- clear_map();
- STREAM_INFO.add_stream("stream_name".to_string(), Some(schema()), Alerts::default());
-
- let query = Value::from_str(prefix).unwrap();
- let query = Query::parse(query).unwrap();
- assert_eq!(&query.stream_name, "stream_name");
- let prefixes = query.get_prefixes();
- let left = prefixes.iter().map(String::as_str).collect::>();
- assert_eq!(left.as_slice(), right);
- }
-
- // If there is no schema for this stream then parsing a Query should fail
- #[rstest]
- #[case(
- r#"{
- "query": "SELECT * FROM stream_name",
- "startTime": "2022-10-15T10:00:00+00:00",
- "endTime": "2022-10-15T10:01:00+00:00"
- }"#
- )]
- #[serial_test::serial]
- fn query_parse_prefix_with_no_schema(#[case] prefix: &str) {
- clear_map();
- STREAM_INFO.add_stream("stream_name".to_string(), None, Alerts::default());
-
- let query = Value::from_str(prefix).unwrap();
- assert!(Query::parse(query).is_err());
- }
}
diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs
index e6aff729b..b09bb405b 100644
--- a/server/src/query/table_provider.rs
+++ b/server/src/query/table_provider.rs
@@ -32,36 +32,47 @@ use datafusion::logical_expr::TableType;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
+use itertools::Itertools;
use std::any::Any;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
+use crate::storage::ObjectStorage;
+
pub struct QueryTableProvider {
- arrow_files: Vec,
- parquet_files: Vec,
- storage: Option,
+ // parquet - ( arrow files )
+ staging_arrows: Vec<(PathBuf, Vec)>,
+ other_staging_parquet: Vec,
+ storage_prefixes: Vec,
+ storage: Arc,
schema: Arc,
}
impl QueryTableProvider {
pub fn new(
- arrow_files: Vec,
- parquet_files: Vec,
- storage: Option,
+ staging_arrows: Vec<(PathBuf, Vec)>,
+ other_staging_parquet: Vec,
+ storage_prefixes: Vec,
+ storage: Arc,
schema: Arc,
) -> Self {
// By the time this query executes the arrow files could be converted to parquet files
// we want to preserve these files as well in case
-
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
- for file in &parquet_files {
+
+ for file in staging_arrows
+ .iter()
+ .map(|(p, _)| p)
+ .chain(other_staging_parquet.iter())
+ {
parquet_cached.upsert(file)
}
Self {
- arrow_files,
- parquet_files,
+ staging_arrows,
+ other_staging_parquet,
+ storage_prefixes,
storage,
schema,
}
@@ -75,10 +86,14 @@ impl QueryTableProvider {
limit: Option,
) -> Result, DataFusionError> {
let mut mem_records: Vec> = Vec::new();
- let mut parquet_files = self.parquet_files.clone();
- for file in &self.arrow_files {
- load_arrows(file, &mut mem_records, &mut parquet_files);
+ let mut parquet_files = Vec::new();
+
+ for (staging_parquet, arrow_files) in &self.staging_arrows {
+ if !load_arrows(arrow_files, &self.schema, &mut mem_records) {
+ parquet_files.push(staging_parquet.clone())
+ }
}
+ parquet_files.extend(self.other_staging_parquet.clone());
let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
let memexec = memtable.scan(ctx, projection, filters, limit).await?;
@@ -86,13 +101,22 @@ impl QueryTableProvider {
let cache_exec = if parquet_files.is_empty() {
memexec
} else {
- let listtable = local_parquet_table(&parquet_files, &self.schema)?;
- let listexec = listtable.scan(ctx, projection, filters, limit).await?;
- Arc::new(UnionExec::new(vec![memexec, listexec]))
+ match local_parquet_table(&parquet_files, &self.schema) {
+ Some(table) => {
+ let listexec = table.scan(ctx, projection, filters, limit).await?;
+ Arc::new(UnionExec::new(vec![memexec, listexec]))
+ }
+ None => memexec,
+ }
};
let mut exec = vec![cache_exec];
- if let Some(ref storage_listing) = self.storage {
+
+ let table = self
+ .storage
+ .query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?;
+
+ if let Some(ref storage_listing) = table {
exec.push(
storage_listing
.scan(ctx, projection, filters, limit)
@@ -107,7 +131,12 @@ impl QueryTableProvider {
impl Drop for QueryTableProvider {
fn drop(&mut self) {
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
- for file in &self.parquet_files {
+ for file in self
+ .staging_arrows
+ .iter()
+ .map(|(p, _)| p)
+ .chain(self.other_staging_parquet.iter())
+ {
parquet_cached.remove(file)
}
}
@@ -139,10 +168,7 @@ impl TableProvider for QueryTableProvider {
}
}
-fn local_parquet_table(
- parquet_files: &[PathBuf],
- schema: &SchemaRef,
-) -> Result {
+fn local_parquet_table(parquet_files: &[PathBuf], schema: &SchemaRef) -> Option {
let listing_options = ListingOptions {
file_extension: ".parquet".to_owned(),
format: Arc::new(ParquetFormat::default().with_enable_pruning(true)),
@@ -153,37 +179,45 @@ fn local_parquet_table(
let paths = parquet_files
.iter()
- .map(|path| {
+ .flat_map(|path| {
ListingTableUrl::parse(path.to_str().expect("path should is valid unicode"))
- .expect("path is valid for filesystem listing")
})
- .collect();
+ .collect_vec();
+
+ if paths.is_empty() {
+ return None;
+ }
let config = ListingTableConfig::new_with_multi_paths(paths)
.with_listing_options(listing_options)
.with_schema(Arc::clone(schema));
- ListingTable::try_new(config)
+ match ListingTable::try_new(config) {
+ Ok(table) => Some(table),
+ Err(err) => {
+ log::error!("Local parquet query failed due to err: {err}");
+ None
+ }
+ }
}
fn load_arrows(
- file: &PathBuf,
+ files: &[PathBuf],
+ schema: &Schema,
mem_records: &mut Vec>,
- parquet_files: &mut Vec,
-) {
- let Ok(arrow_file) = File::open(file) else { return; };
- let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return; };
- let records = reader
- .filter_map(|record| match record {
- Ok(record) => Some(record),
- Err(e) => {
- log::warn!("warning from arrow stream {:?}", e);
- None
- }
- })
- .collect();
+) -> bool {
+ let mut stream_readers = Vec::with_capacity(files.len());
+
+ for file in files {
+ let Ok(arrow_file) = File::open(file) else { return false; };
+ let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return false; };
+ stream_readers.push(reader);
+ }
+
+ let reader = crate::storage::MergedRecordReader {
+ readers: stream_readers,
+ };
+ let records = reader.merged_iter(schema).collect();
mem_records.push(records);
- let mut file = file.clone();
- file.set_extension("parquet");
- parquet_files.retain(|p| p != &file);
+ true
}
diff --git a/server/src/storage.rs b/server/src/storage.rs
index ae6da4ed3..1fb8666be 100644
--- a/server/src/storage.rs
+++ b/server/src/storage.rs
@@ -19,6 +19,7 @@
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
+use crate::stats::Stats;
use crate::storage::file_link::{FileLink, FileTable};
use crate::utils;
@@ -29,8 +30,9 @@ use datafusion::parquet::errors::ParquetError;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
use std::fs::create_dir_all;
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
mod file_link;
@@ -40,6 +42,7 @@ mod s3;
mod store_metadata;
pub use localfs::{FSConfig, LocalFS};
+pub use object_storage::MergedRecordReader;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::{S3Config, S3};
pub use store_metadata::StorageMetadata;
@@ -66,13 +69,16 @@ const ACCESS_ALL: &str = "all";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ObjectStoreFormat {
+ /// Version of schema registry
pub version: String,
+ /// Version for change in the way how parquet are generated/stored.
#[serde(rename = "objectstore-format")]
pub objectstore_format: String,
#[serde(rename = "created-at")]
pub created_at: String,
pub owner: Owner,
pub permissions: Vec,
+ pub stats: Stats,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -107,11 +113,12 @@ impl Permisssion {
impl Default for ObjectStoreFormat {
fn default() -> Self {
Self {
- version: "v1".to_string(),
+ version: "v2".to_string(),
objectstore_format: "v1".to_string(),
created_at: Local::now().to_rfc3339(),
owner: Owner::new("".to_string(), "".to_string()),
permissions: vec![Permisssion::new("parseable".to_string())],
+ stats: Stats::default(),
}
}
}
@@ -205,7 +212,7 @@ impl StorageDir {
Self { data_path }
}
- fn filename_by_time(time: NaiveDateTime) -> String {
+ fn file_time_suffix(time: NaiveDateTime) -> String {
let uri = utils::date_to_prefix(time.date())
+ &utils::hour_to_prefix(time.hour())
+ &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap();
@@ -214,13 +221,18 @@ impl StorageDir {
format!("{local_uri}{hostname}.data.arrows")
}
- fn filename_by_current_time() -> String {
+ fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String {
+ format!("{}.{}", stream_hash, Self::file_time_suffix(time))
+ }
+
+ fn filename_by_current_time(stream_hash: &str) -> String {
let datetime = Utc::now();
- Self::filename_by_time(datetime.naive_utc())
+ Self::filename_by_time(stream_hash, datetime.naive_utc())
}
- pub fn path_by_current_time(&self) -> PathBuf {
- self.data_path.join(Self::filename_by_current_time())
+ pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf {
+ self.data_path
+ .join(Self::filename_by_current_time(stream_hash))
}
pub fn arrow_files(&self) -> Vec {
@@ -236,6 +248,48 @@ impl StorageDir {
paths
}
+ pub fn arrow_files_grouped_by_time(&self) -> HashMap> {
+ // hashmap
+ let mut grouped_arrow_file: HashMap> = HashMap::new();
+ let arrow_files = self.arrow_files();
+ for arrow_file_path in arrow_files {
+ let key = Self::arrow_path_to_parquet(&arrow_file_path);
+ grouped_arrow_file
+ .entry(key)
+ .or_default()
+ .push(arrow_file_path);
+ }
+
+ grouped_arrow_file
+ }
+
+ pub fn arrow_files_grouped_exclude_time(
+ &self,
+ exclude: NaiveDateTime,
+ ) -> HashMap> {
+ let hot_filename = StorageDir::file_time_suffix(exclude);
+ // hashmap but exclude where hotfilename matches
+ let mut grouped_arrow_file: HashMap> = HashMap::new();
+ let mut arrow_files = self.arrow_files();
+ arrow_files.retain(|path| {
+ !path
+ .file_name()
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .ends_with(&hot_filename)
+ });
+ for arrow_file_path in arrow_files {
+ let key = Self::arrow_path_to_parquet(&arrow_file_path);
+ grouped_arrow_file
+ .entry(key)
+ .or_default()
+ .push(arrow_file_path);
+ }
+
+ grouped_arrow_file
+ }
+
pub fn parquet_files(&self) -> Vec {
let Ok(dir) = self.data_path
.read_dir() else { return vec![] };
@@ -245,12 +299,19 @@ impl StorageDir {
.filter(|file| file.extension().map_or(false, |ext| ext.eq("parquet")))
.collect()
}
+
+ fn arrow_path_to_parquet(path: &Path) -> PathBuf {
+ let filename = path.file_name().unwrap().to_str().unwrap();
+ let (_, filename) = filename.split_once('.').unwrap();
+ let mut parquet_path = path.to_owned();
+ parquet_path.set_file_name(filename);
+ parquet_path.set_extension("parquet");
+ parquet_path
+ }
}
#[derive(Debug, thiserror::Error)]
pub enum MoveDataError {
- #[error("Unable to Open file after moving")]
- Open,
#[error("Unable to create recordbatch stream")]
Arrow(#[from] ArrowError),
#[error("Could not generate parquet file")]
@@ -259,8 +320,6 @@ pub enum MoveDataError {
ObjectStorage(#[from] ObjectStorageError),
#[error("Could not generate parquet file")]
Create,
- #[error("Could not delete temp arrow file")]
- Delete,
}
#[derive(Debug, thiserror::Error)]
diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs
index 277db8601..299770486 100644
--- a/server/src/storage/localfs.rs
+++ b/server/src/storage/localfs.rs
@@ -21,6 +21,7 @@ use std::{
sync::Arc,
};
+use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::{
@@ -28,6 +29,7 @@ use datafusion::{
file_format::parquet::ParquetFormat,
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
+ error::DataFusionError,
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
};
use fs_extra::file::{move_file, CopyOptions};
@@ -36,7 +38,7 @@ use relative_path::RelativePath;
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;
-use crate::{option::validation, query::Query, utils::validate_path_is_writeable};
+use crate::{option::validation, utils::validate_path_is_writeable};
use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
@@ -166,9 +168,12 @@ impl ObjectStorage for LocalFS {
Ok(())
}
- fn query_table(&self, query: &Query) -> Result, ObjectStorageError> {
- let prefixes: Vec = query
- .get_prefixes()
+ fn query_table(
+ &self,
+ prefixes: Vec,
+ schema: Arc,
+ ) -> Result, DataFusionError> {
+ let prefixes: Vec = prefixes
.into_iter()
.filter_map(|prefix| {
let path = self.root.join(prefix);
@@ -191,7 +196,7 @@ impl ObjectStorage for LocalFS {
let config = ListingTableConfig::new_with_multi_paths(prefixes)
.with_listing_options(listing_options)
- .with_schema(Arc::clone(&query.schema));
+ .with_schema(schema);
Ok(Some(ListingTable::try_new(config)?))
}
diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs
index c946ec302..fe6678137 100644
--- a/server/src/storage/object_storage.rs
+++ b/server/src/storage/object_storage.rs
@@ -20,17 +20,25 @@ use super::{
file_link::CacheState, LogStream, MoveDataError, ObjectStorageError, ObjectStoreFormat,
Permisssion, StorageDir, StorageMetadata, CACHED_FILES,
};
-use crate::{alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats};
+use crate::{
+ alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, stats::Stats,
+ utils::batch_adapter::adapt_batch,
+};
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::{
- arrow::ipc::reader::StreamReader,
+ arrow::{
+ array::TimestampMillisecondArray, ipc::reader::StreamReader, record_batch::RecordBatch,
+ },
datasource::listing::ListingTable,
+ error::DataFusionError,
execution::runtime_env::RuntimeEnv,
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
};
+use itertools::kmerge_by;
+use lazy_static::__Deref;
use relative_path::RelativePath;
use relative_path::RelativePathBuf;
use serde::Serialize;
@@ -39,7 +47,8 @@ use serde_json::Value;
use std::{
collections::HashMap,
fs::{self, File},
- path::Path,
+ path::{Path, PathBuf},
+ process,
sync::Arc,
};
@@ -67,15 +76,22 @@ pub trait ObjectStorage: Sync + 'static {
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
async fn list_streams(&self) -> Result, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
- fn query_table(&self, query: &Query) -> Result, ObjectStorageError>;
+ fn query_table(
+ &self,
+ prefixes: Vec,
+ schema: Arc,
+ ) -> Result, DataFusionError>;
- async fn put_schema(
+ async fn put_schema_map(
&self,
stream_name: &str,
- schema: &Schema,
+ schema_map: &str,
) -> Result<(), ObjectStorageError> {
- self.put_object(&schema_path(stream_name), to_bytes(schema))
- .await?;
+ self.put_object(
+ &schema_path(stream_name),
+ Bytes::copy_from_slice(schema_map.as_bytes()),
+ )
+ .await?;
Ok(())
}
@@ -88,8 +104,12 @@ pub trait ObjectStorage: Sync + 'static {
let format_json = to_bytes(&format);
- self.put_object(&schema_path(stream_name), "".into())
- .await?;
+ self.put_object(
+ &schema_path(stream_name),
+ to_bytes(&HashMap::::new()),
+ )
+ .await?;
+
self.put_object(&stream_json_path(stream_name), format_json)
.await?;
@@ -125,9 +145,12 @@ pub trait ObjectStorage: Sync + 'static {
.await
}
- async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError> {
+ async fn get_schema_map(
+ &self,
+ stream_name: &str,
+ ) -> Result>, ObjectStorageError> {
let schema = self.get_object(&schema_path(stream_name)).await?;
- let schema = serde_json::from_slice(&schema).ok();
+ let schema = serde_json::from_slice(&schema).expect("schema map is valid json");
Ok(schema)
}
@@ -141,6 +164,14 @@ pub trait ObjectStorage: Sync + 'static {
}
}
+ async fn get_stream_metadata(
+ &self,
+ stream_name: &str,
+ ) -> Result {
+ let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
+ Ok(serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"))
+ }
+
async fn get_stats(&self, stream_name: &str) -> Result {
let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
let stream_metadata: Value =
@@ -171,6 +202,16 @@ pub trait ObjectStorage: Sync + 'static {
Ok(parseable_metadata)
}
+ async fn stream_exists(&self, stream_name: &str) -> Result {
+ let res = self.get_object(&stream_json_path(stream_name)).await;
+
+ match res {
+ Ok(_) => Ok(true),
+ Err(ObjectStorageError::NoSuchKey(_)) => Ok(false),
+ Err(e) => Err(e),
+ }
+ }
+
async fn sync(&self) -> Result<(), MoveDataError> {
if !Path::new(&CONFIG.staging_dir()).exists() {
return Ok(());
@@ -181,51 +222,36 @@ pub trait ObjectStorage: Sync + 'static {
let mut stream_stats = HashMap::new();
for stream in &streams {
- // get dir
let dir = StorageDir::new(stream);
// walk dir, find all .arrows files and convert to parquet
-
- let mut arrow_files = dir.arrow_files();
// Do not include file which is being written to
- let hot_file = dir.path_by_current_time();
- let hot_filename = hot_file.file_name().expect("is a not none filename");
+ let time = chrono::Utc::now().naive_utc();
+ let staging_files = dir.arrow_files_grouped_exclude_time(time);
- arrow_files.retain(|file| {
- !file
- .file_name()
- .expect("is a not none filename")
- .eq(hot_filename)
- });
+ for (parquet_path, files) in staging_files {
+ let record_reader = MergedRecordReader::try_new(&files).unwrap();
- for file in arrow_files {
- let arrow_file = File::open(&file).map_err(|_| MoveDataError::Open)?;
- let reader = StreamReader::try_new(arrow_file, None)?;
- let schema = reader.schema();
- let records = reader.filter_map(|record| match record {
- Ok(record) => Some(record),
- Err(e) => {
- log::warn!("warning from arrow stream {:?}", e);
- None
- }
- });
-
- let mut parquet_path = file.clone();
- parquet_path.set_extension("parquet");
let mut parquet_table = CACHED_FILES.lock().unwrap();
let parquet_file =
fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
parquet_table.upsert(&parquet_path);
let props = WriterProperties::builder().build();
- let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?;
+ let schema = Arc::new(record_reader.merged_schema());
+ let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;
- for ref record in records {
+ for ref record in record_reader.merged_iter(&schema) {
writer.write(record)?;
}
writer.close()?;
- fs::remove_file(file).map_err(|_| MoveDataError::Delete)?;
+ for file in files {
+ if fs::remove_file(file).is_err() {
+ log::error!("Failed to delete file. Unstable state");
+ process::abort()
+ }
+ }
}
for file in dir.parquet_files() {
@@ -303,6 +329,56 @@ pub trait ObjectStorage: Sync + 'static {
}
}
+#[derive(Debug)]
+pub struct MergedRecordReader {
+ pub readers: Vec>,
+}
+
+impl MergedRecordReader {
+ pub fn try_new(files: &[PathBuf]) -> Result {
+ let mut readers = Vec::with_capacity(files.len());
+
+ for file in files {
+ let reader = StreamReader::try_new(File::open(file).unwrap(), None)?;
+ readers.push(reader);
+ }
+
+ Ok(Self { readers })
+ }
+
+ pub fn merged_iter(self, schema: &Schema) -> impl Iterator- + '_ {
+ let adapted_readers = self
+ .readers
+ .into_iter()
+ .map(move |reader| reader.flatten().map(|batch| adapt_batch(schema, batch)));
+
+ kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| {
+ let a: &TimestampMillisecondArray = a
+ .column(0)
+ .as_any()
+ .downcast_ref::
()
+ .unwrap();
+
+ let b: &TimestampMillisecondArray = b
+ .column(0)
+ .as_any()
+ .downcast_ref::()
+ .unwrap();
+
+ a.value(0) < b.value(0)
+ })
+ }
+
+ pub fn merged_schema(&self) -> Schema {
+ Schema::try_merge(
+ self.readers
+ .iter()
+ .map(|stream| stream.schema().deref().clone()),
+ )
+ .unwrap()
+ }
+}
+
#[inline(always)]
fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
serde_json::to_vec(any)
diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs
index 1ba6adc1e..8775b6ef8 100644
--- a/server/src/storage/s3.rs
+++ b/server/src/storage/s3.rs
@@ -16,6 +16,7 @@
*
*/
+use arrow_schema::Schema;
use async_trait::async_trait;
use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind};
use aws_sdk_s3::model::{CommonPrefix, Delete, ObjectIdentifier};
@@ -32,6 +33,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::object_store::ObjectStoreRegistry;
+use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use futures::StreamExt;
use http::Uri;
@@ -44,8 +46,6 @@ use std::iter::Iterator;
use std::path::Path;
use std::sync::Arc;
-use crate::query::Query;
-
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
use super::ObjectStorageProvider;
@@ -319,10 +319,13 @@ impl ObjectStorage for S3 {
Ok(())
}
- fn query_table(&self, query: &Query) -> Result, ObjectStorageError> {
+ fn query_table(
+ &self,
+ prefixes: Vec,
+ schema: Arc,
+ ) -> Result, DataFusionError> {
// Get all prefix paths and convert them into futures which yeilds ListingTableUrl
- let prefixes: Vec = query
- .get_prefixes()
+ let prefixes: Vec = prefixes
.into_iter()
.map(|prefix| {
let path = format!("s3://{}/{}", &self.bucket, prefix);
@@ -345,7 +348,7 @@ impl ObjectStorage for S3 {
let config = ListingTableConfig::new_with_multi_paths(prefixes)
.with_listing_options(listing_options)
- .with_schema(Arc::clone(&query.schema));
+ .with_schema(schema);
Ok(Some(ListingTable::try_new(config)?))
}
diff --git a/server/src/utils.rs b/server/src/utils.rs
index c49ba61a2..4b4fb91e8 100644
--- a/server/src/utils.rs
+++ b/server/src/utils.rs
@@ -16,97 +16,15 @@
*
*/
+pub mod batch_adapter;
+pub mod header_parsing;
+pub mod json;
+pub mod uid;
+pub mod update;
+
use std::path::Path;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
-use serde_json::{json, Value};
-
-pub fn flatten_json_body(body: &serde_json::Value) -> Result {
- let mut flat_value: Value = json!({});
- flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap();
- Ok(flat_value)
-}
-
-pub fn merge(value: &mut Value, fields: impl Iterator- ) {
- if let Value::Object(m) = value {
- for (k, v) in fields {
- match m.get_mut(&k) {
- Some(val) => {
- *val = v;
- }
- None => {
- m.insert(k, v);
- }
- }
- }
- }
-}
-
-pub mod header_parsing {
- const MAX_HEADERS_ALLOWED: usize = 10;
- use actix_web::{HttpRequest, HttpResponse, ResponseError};
-
- pub fn collect_labelled_headers(
- req: &HttpRequest,
- prefix: &str,
- kv_separator: char,
- ) -> Result
{
- // filter out headers which has right prefix label and convert them into str;
- let headers = req.headers().iter().filter_map(|(key, value)| {
- let key = key.as_str().strip_prefix(prefix)?;
- Some((key, value))
- });
-
- let mut labels: Vec = Vec::new();
-
- for (key, value) in headers {
- let value = value.to_str().map_err(|_| ParseHeaderError::InvalidValue)?;
- if key.is_empty() {
- return Err(ParseHeaderError::Emptykey);
- }
- if key.contains(kv_separator) {
- return Err(ParseHeaderError::SeperatorInKey(kv_separator));
- }
- if value.contains(kv_separator) {
- return Err(ParseHeaderError::SeperatorInValue(kv_separator));
- }
-
- labels.push(format!("{key}={value}"));
- }
-
- if labels.len() > MAX_HEADERS_ALLOWED {
- return Err(ParseHeaderError::MaxHeadersLimitExceeded);
- }
-
- Ok(labels.join(&kv_separator.to_string()))
- }
-
- #[derive(Debug, thiserror::Error)]
- pub enum ParseHeaderError {
- #[error("Too many headers received. Limit is of 5 headers")]
- MaxHeadersLimitExceeded,
- #[error("A value passed in header can't be formatted to plain visible ASCII")]
- InvalidValue,
- #[error("Invalid Key was passed which terminated just after the end of prefix")]
- Emptykey,
- #[error("A key passed in header contains reserved char {0}")]
- SeperatorInKey(char),
- #[error("A value passed in header contains reserved char {0}")]
- SeperatorInValue(char),
- #[error("Stream name not found in header [x-p-stream]")]
- MissingStreamName,
- }
-
- impl ResponseError for ParseHeaderError {
- fn status_code(&self) -> http::StatusCode {
- http::StatusCode::BAD_REQUEST
- }
-
- fn error_response(&self) -> HttpResponse {
- HttpResponse::build(self.status_code()).body(self.to_string())
- }
- }
-}
#[allow(dead_code)]
pub fn hostname() -> Option {
@@ -133,96 +51,6 @@ pub fn validate_path_is_writeable(path: &Path) -> anyhow::Result<()> {
Ok(())
}
-pub mod uid {
- use ulid::Ulid;
-
- pub type Uid = Ulid;
-
- pub fn gen() -> Ulid {
- Ulid::new()
- }
-}
-
-pub mod update {
- use crate::banner::about::current;
- use std::env;
- use std::{path::Path, time::Duration};
-
- use anyhow::anyhow;
- use chrono::{DateTime, Utc};
- use ulid::Ulid;
-
- use crate::storage::StorageMetadata;
-
- static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST";
-
- #[derive(Debug)]
- pub struct LatestRelease {
- pub version: semver::Version,
- pub date: DateTime,
- }
-
- fn is_docker() -> bool {
- Path::new("/.dockerenv").exists()
- }
-
- fn is_k8s() -> bool {
- env::var(K8S_ENV_TO_CHECK).is_ok()
- }
-
- fn platform() -> &'static str {
- if is_k8s() {
- "Kubernetes"
- } else if is_docker() {
- "Docker"
- } else {
- "Native"
- }
- }
-
- // User Agent for Download API call
- // Format: Parseable/// (; )
- fn user_agent(uid: &Ulid) -> String {
- let info = os_info::get();
- format!(
- "Parseable/{}/{}/{} ({}; {})",
- uid,
- current().0,
- current().1,
- info.os_type(),
- platform()
- )
- }
-
- pub fn get_latest(meta: &StorageMetadata) -> Result {
- let agent = ureq::builder()
- .user_agent(user_agent(&meta.deployment_id).as_str())
- .timeout(Duration::from_secs(8))
- .build();
-
- let json: serde_json::Value = agent
- .get("https://download.parseable.io/latest-version")
- .call()?
- .into_json()?;
-
- let version = json["tag_name"]
- .as_str()
- .and_then(|ver| ver.strip_prefix('v'))
- .and_then(|ver| semver::Version::parse(ver).ok())
- .ok_or_else(|| anyhow!("Failed parsing version"))?;
-
- let date = json["published_at"]
- .as_str()
- .ok_or_else(|| anyhow!("Failed parsing published date"))?;
-
- let date = chrono::DateTime::parse_from_rfc3339(date)
- .expect("date-time from github is in rfc3339 format")
- .into();
-
- Ok(LatestRelease { version, date })
- }
-}
-
/// Convert minutes to a slot range
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option {
@@ -271,13 +99,9 @@ impl TimePeriod {
}
}
- pub fn generate_prefixes(&self, prefix: &str) -> Vec {
- let prefix = format!("{prefix}/");
-
+ pub fn generate_prefixes(&self) -> Vec {
let end_minute = self.end.minute() + u32::from(self.end.second() > 0);
-
self.generate_date_prefixes(
- &prefix,
self.start.date_naive(),
self.end.date_naive(),
(self.start.hour(), self.start.minute()),
@@ -371,7 +195,6 @@ impl TimePeriod {
pub fn generate_date_prefixes(
&self,
- prefix: &str,
start_date: NaiveDate,
end_date: NaiveDate,
start_time: (u32, u32),
@@ -381,7 +204,7 @@ impl TimePeriod {
let mut date = start_date;
while date <= end_date {
- let prefix = prefix.to_owned() + &date_to_prefix(date);
+ let prefix = date_to_prefix(date);
let is_start = date == start_date;
let is_end = date == end_date;
@@ -426,66 +249,66 @@ mod tests {
#[rstest]
#[case::same_minute(
"2022-06-11T16:30:00+00:00", "2022-06-11T16:30:59+00:00",
- &["stream_name/date=2022-06-11/hour=16/minute=30/"]
+ &["date=2022-06-11/hour=16/minute=30/"]
)]
#[case::same_hour_different_minute(
"2022-06-11T16:57:00+00:00", "2022-06-11T16:59:00+00:00",
&[
- "stream_name/date=2022-06-11/hour=16/minute=57/",
- "stream_name/date=2022-06-11/hour=16/minute=58/"
+ "date=2022-06-11/hour=16/minute=57/",
+ "date=2022-06-11/hour=16/minute=58/"
]
)]
#[case::same_hour_with_00_to_59_minute_block(
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
- &["stream_name/date=2022-06-11/hour=16/"]
+ &["date=2022-06-11/hour=16/"]
)]
#[case::same_date_different_hours_coherent_minute(
"2022-06-11T15:00:00+00:00", "2022-06-11T17:00:00+00:00",
&[
- "stream_name/date=2022-06-11/hour=15/",
- "stream_name/date=2022-06-11/hour=16/"
+ "date=2022-06-11/hour=15/",
+ "date=2022-06-11/hour=16/"
]
)]
#[case::same_date_different_hours_incoherent_minutes(
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
&[
- "stream_name/date=2022-06-11/hour=15/minute=59/",
- "stream_name/date=2022-06-11/hour=16/minute=00/"
+ "date=2022-06-11/hour=15/minute=59/",
+ "date=2022-06-11/hour=16/minute=00/"
]
)]
#[case::same_date_different_hours_whole_hours_between_incoherent_minutes(
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
&[
- "stream_name/date=2022-06-11/hour=15/minute=59/",
- "stream_name/date=2022-06-11/hour=16/",
- "stream_name/date=2022-06-11/hour=17/minute=00/"
+ "date=2022-06-11/hour=15/minute=59/",
+ "date=2022-06-11/hour=16/",
+ "date=2022-06-11/hour=17/minute=00/"
]
)]
#[case::different_date_coherent_hours_and_minutes(
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
&[
- "stream_name/date=2022-06-11/",
- "stream_name/date=2022-06-12/"
+ "date=2022-06-11/",
+ "date=2022-06-12/"
]
)]
#[case::different_date_incoherent_hours_coherent_minutes(
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
&[
- "stream_name/date=2022-06-11/hour=23/",
- "stream_name/date=2022-06-12/hour=00/",
- "stream_name/date=2022-06-12/hour=01/"
+ "date=2022-06-11/hour=23/",
+ "date=2022-06-12/hour=00/",
+ "date=2022-06-12/hour=01/"
]
)]
#[case::different_date_incoherent_hours_incoherent_minutes(
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
&[
- "stream_name/date=2022-06-11/hour=23/minute=59/",
- "stream_name/date=2022-06-12/hour=00/minute=00/"
+ "date=2022-06-11/hour=23/minute=59/",
+ "date=2022-06-12/hour=00/minute=00/"
]
)]
fn prefix_generation(#[case] start: &str, #[case] end: &str, #[case] right: &[&str]) {
let time_period = time_period_from_str(start, end);
- let prefixes = time_period.generate_prefixes("stream_name");
+ let prefixes = time_period.generate_prefixes();
let left = prefixes.iter().map(String::as_str).collect::>();
assert_eq!(left.as_slice(), right);
}
diff --git a/server/src/utils/batch_adapter.rs b/server/src/utils/batch_adapter.rs
new file mode 100644
index 000000000..7fdd2febe
--- /dev/null
+++ b/server/src/utils/batch_adapter.rs
@@ -0,0 +1,43 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use arrow_schema::Schema;
+use datafusion::arrow::array::new_null_array;
+use datafusion::arrow::array::ArrayRef;
+use datafusion::arrow::record_batch::RecordBatch;
+
+use std::sync::Arc;
+
+pub fn adapt_batch(table_schema: &Schema, batch: RecordBatch) -> RecordBatch {
+ let batch_schema = &*batch.schema();
+
+ let mut cols: Vec = Vec::with_capacity(table_schema.fields().len());
+ let batch_cols = batch.columns().to_vec();
+
+ for table_field in table_schema.fields() {
+ if let Some((batch_idx, _)) = batch_schema.column_with_name(table_field.name().as_str()) {
+ cols.push(Arc::clone(&batch_cols[batch_idx]));
+ } else {
+ cols.push(new_null_array(table_field.data_type(), batch.num_rows()))
+ }
+ }
+
+ let merged_schema = Arc::new(table_schema.clone());
+
+ RecordBatch::try_new(merged_schema, cols).unwrap()
+}
diff --git a/server/src/utils/header_parsing.rs b/server/src/utils/header_parsing.rs
new file mode 100644
index 000000000..0708804bf
--- /dev/null
+++ b/server/src/utils/header_parsing.rs
@@ -0,0 +1,81 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+const MAX_HEADERS_ALLOWED: usize = 10;
+use actix_web::{HttpRequest, HttpResponse, ResponseError};
+
+pub fn collect_labelled_headers(
+ req: &HttpRequest,
+ prefix: &str,
+ kv_separator: char,
+) -> Result {
+ // filter out headers which has right prefix label and convert them into str;
+ let headers = req.headers().iter().filter_map(|(key, value)| {
+ let key = key.as_str().strip_prefix(prefix)?;
+ Some((key, value))
+ });
+
+ let mut labels: Vec = Vec::new();
+
+ for (key, value) in headers {
+ let value = value.to_str().map_err(|_| ParseHeaderError::InvalidValue)?;
+ if key.is_empty() {
+ return Err(ParseHeaderError::Emptykey);
+ }
+ if key.contains(kv_separator) {
+ return Err(ParseHeaderError::SeperatorInKey(kv_separator));
+ }
+ if value.contains(kv_separator) {
+ return Err(ParseHeaderError::SeperatorInValue(kv_separator));
+ }
+
+ labels.push(format!("{key}={value}"));
+ }
+
+ if labels.len() > MAX_HEADERS_ALLOWED {
+ return Err(ParseHeaderError::MaxHeadersLimitExceeded);
+ }
+
+ Ok(labels.join(&kv_separator.to_string()))
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum ParseHeaderError {
+ #[error("Too many headers received. Limit is of 5 headers")]
+ MaxHeadersLimitExceeded,
+ #[error("A value passed in header can't be formatted to plain visible ASCII")]
+ InvalidValue,
+ #[error("Invalid Key was passed which terminated just after the end of prefix")]
+ Emptykey,
+ #[error("A key passed in header contains reserved char {0}")]
+ SeperatorInKey(char),
+ #[error("A value passed in header contains reserved char {0}")]
+ SeperatorInValue(char),
+ #[error("Stream name not found in header [x-p-stream]")]
+ MissingStreamName,
+}
+
+impl ResponseError for ParseHeaderError {
+ fn status_code(&self) -> http::StatusCode {
+ http::StatusCode::BAD_REQUEST
+ }
+
+ fn error_response(&self) -> HttpResponse {
+ HttpResponse::build(self.status_code()).body(self.to_string())
+ }
+}
diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs
new file mode 100644
index 000000000..f1d536d40
--- /dev/null
+++ b/server/src/utils/json.rs
@@ -0,0 +1,42 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use serde_json;
+use serde_json::json;
+use serde_json::Value;
+
+pub fn flatten_json_body(body: &serde_json::Value) -> Result {
+ let mut flat_value: Value = json!({});
+ flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap();
+ Ok(flat_value)
+}
+
+pub fn merge(value: &mut Value, fields: impl Iterator- ) {
+ if let Value::Object(m) = value {
+ for (k, v) in fields {
+ match m.get_mut(&k) {
+ Some(val) => {
+ *val = v;
+ }
+ None => {
+ m.insert(k, v);
+ }
+ }
+ }
+ }
+}
diff --git a/server/src/utils/uid.rs b/server/src/utils/uid.rs
new file mode 100644
index 000000000..8a895d3bf
--- /dev/null
+++ b/server/src/utils/uid.rs
@@ -0,0 +1,25 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see
.
+ *
+ */
+
+use ulid::Ulid;
+
+pub type Uid = Ulid;
+
+pub fn gen() -> Ulid {
+ Ulid::new()
+}
diff --git a/server/src/utils/update.rs b/server/src/utils/update.rs
new file mode 100644
index 000000000..9a0c7be89
--- /dev/null
+++ b/server/src/utils/update.rs
@@ -0,0 +1,95 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use crate::banner::about::current;
+use std::env;
+use std::{path::Path, time::Duration};
+
+use anyhow::anyhow;
+use chrono::{DateTime, Utc};
+use ulid::Ulid;
+
+use crate::storage::StorageMetadata;
+
+static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST";
+
+#[derive(Debug)]
+pub struct LatestRelease {
+ pub version: semver::Version,
+ pub date: DateTime,
+}
+
+fn is_docker() -> bool {
+ Path::new("/.dockerenv").exists()
+}
+
+fn is_k8s() -> bool {
+ env::var(K8S_ENV_TO_CHECK).is_ok()
+}
+
+fn platform() -> &'static str {
+ if is_k8s() {
+ "Kubernetes"
+ } else if is_docker() {
+ "Docker"
+ } else {
+ "Native"
+ }
+}
+
+// User Agent for Download API call
+// Format: Parseable/// (; )
+fn user_agent(uid: &Ulid) -> String {
+ let info = os_info::get();
+ format!(
+ "Parseable/{}/{}/{} ({}; {})",
+ uid,
+ current().0,
+ current().1,
+ info.os_type(),
+ platform()
+ )
+}
+
+pub fn get_latest(meta: &StorageMetadata) -> Result {
+ let agent = ureq::builder()
+ .user_agent(user_agent(&meta.deployment_id).as_str())
+ .timeout(Duration::from_secs(8))
+ .build();
+
+ let json: serde_json::Value = agent
+ .get("https://download.parseable.io/latest-version")
+ .call()?
+ .into_json()?;
+
+ let version = json["tag_name"]
+ .as_str()
+ .and_then(|ver| ver.strip_prefix('v'))
+ .and_then(|ver| semver::Version::parse(ver).ok())
+ .ok_or_else(|| anyhow!("Failed parsing version"))?;
+
+ let date = json["published_at"]
+ .as_str()
+ .ok_or_else(|| anyhow!("Failed parsing published date"))?;
+
+ let date = chrono::DateTime::parse_from_rfc3339(date)
+ .expect("date-time from github is in rfc3339 format")
+ .into();
+
+ Ok(LatestRelease { version, date })
+}
diff --git a/server/src/validator.rs b/server/src/validator.rs
index 81ebb80a9..75a0dbc5a 100644
--- a/server/src/validator.rs
+++ b/server/src/validator.rs
@@ -160,17 +160,18 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result Arc::new(schema),
- None => return Err(QueryValidationError::UninitializedStream),
- };
+ if !STREAM_INFO.stream_initialized(&stream_name)? {
+ return Err(QueryValidationError::UninitializedStream);
+ }
+
+ let merged_schema = Arc::new(STREAM_INFO.merged_schema(&stream_name)?);
Ok(Query {
stream_name: tokens[stream_name_index].to_string(),
start,
end,
query: query.to_string(),
- schema,
+ merged_schema,
})
}