From 43ae37c941e56a124b4c925a26d70ba03447608e Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 24 Jan 2023 15:20:56 +0530 Subject: [PATCH 01/22] Implement flexible event ingestion and querying This PR adds the capability to ingest events with varying schemas. During ingestion, events with the same schema are stored in separate files which are then merged into a single Parquet file. This allows for schema to dynamically change over time without having any impact on performance. Fixes #195 --- server/Cargo.toml | 2 + server/src/event.rs | 186 +++++------------------- server/src/event/writer.rs | 204 +++++++++++++++++++++++++++ server/src/handlers/event.rs | 8 +- server/src/handlers/logstream.rs | 58 +++----- server/src/metadata.rs | 54 +++---- server/src/query.rs | 6 +- server/src/storage.rs | 15 +- server/src/storage/object_storage.rs | 44 ++++-- server/src/validator.rs | 29 ++-- 10 files changed, 365 insertions(+), 241 deletions(-) create mode 100644 server/src/event/writer.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index 27523eac4..d7326a70e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -65,6 +65,8 @@ actix-web-static-files = "4.0" static-files = "0.2.1" ulid = { version = "1.0", features = ["serde"] } ureq = { version = "2.5.0", features = ["json"] } +dashmap = "5.4.0" +hex = "0.4.3" [build-dependencies] static-files = "0.2.1" diff --git a/server/src/event.rs b/server/src/event.rs index 1dcd10bf0..9f7404bf5 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -1,3 +1,5 @@ +mod writer; + /* * Parseable Server (C) 2022 - 2023 Parseable, Inc. * @@ -22,163 +24,40 @@ use chrono::{DateTime, Utc}; use datafusion::arrow::array::{Array, TimestampMillisecondArray}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::ArrowError; -use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions}; use datafusion::arrow::record_batch::RecordBatch; -use lazy_static::lazy_static; +use md5::Digest; use serde_json::Value; use std::collections::HashMap; -use std::fs::OpenOptions; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use std::sync::Mutex; -use std::sync::MutexGuard; -use std::sync::RwLock; use crate::metadata; use crate::metadata::LOCK_EXPECT; use crate::option::CONFIG; -use crate::storage::StorageDir; - -use self::error::{EventError, StreamWriterError}; -type LocalWriter = Mutex>>; -type LocalWriterGuard<'a> = MutexGuard<'a, Option>>; +use self::error::EventError; +pub use self::writer::STREAM_WRITERS; const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; const TIME_KEYS: &[&str] = &["time", "date", "datetime", "timestamp"]; -lazy_static! { - #[derive(Default)] - pub static ref STREAM_WRITERS: RwLock> = RwLock::new(HashMap::new()); -} - -impl STREAM_WRITERS { - // append to a existing stream - fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), StreamWriterError> { - let hashmap_guard = STREAM_WRITERS - .read() - .map_err(|_| StreamWriterError::RwPoisoned)?; - - match hashmap_guard.get(stream) { - Some(localwriter) => { - let mut writer_guard = localwriter - .lock() - .map_err(|_| StreamWriterError::MutexPoisoned)?; - - // if it's some writer then we write without dropping any lock - // hashmap cannot be brought mutably at any point until this finishes - if let Some(ref mut writer) = *writer_guard { - writer.write(record).map_err(StreamWriterError::Writer)?; - } else { - // pass on this mutex to set entry so that it can be reused - // we have a guard for underlying entry thus - // hashmap must not be availible as mutable to any other thread - STREAM_WRITERS::set_entry(writer_guard, stream, record)?; - } - } - // entry is not present thus we create it - None => { - // this requires mutable borrow of the map so we drop this read lock and wait for write lock - drop(hashmap_guard); - STREAM_WRITERS::create_entry(stream.to_string(), record)?; - } - }; - Ok(()) - } - - // create a new entry with new stream_writer - // Only create entry for valid streams - fn create_entry(stream: String, record: &RecordBatch) -> Result<(), StreamWriterError> { - let mut hashmap_guard = STREAM_WRITERS - .write() - .map_err(|_| StreamWriterError::RwPoisoned)?; - - let stream_writer = init_new_stream_writer_file(&stream, record)?; - - hashmap_guard.insert(stream, Mutex::new(Some(stream_writer))); - - Ok(()) - } - - // Deleting a logstream requires that metadata is deleted first - pub fn delete_entry(stream: &str) -> Result<(), StreamWriterError> { - let mut hashmap_guard = STREAM_WRITERS - .write() - .map_err(|_| StreamWriterError::RwPoisoned)?; - - hashmap_guard.remove(stream); - - Ok(()) - } - - fn set_entry( - mut writer_guard: LocalWriterGuard, - stream: &str, - record: &RecordBatch, - ) -> Result<(), StreamWriterError> { - let stream_writer = init_new_stream_writer_file(stream, record)?; - - writer_guard.replace(stream_writer); // replace the stream writer behind this mutex - - Ok(()) - } - - pub fn unset_all() -> Result<(), StreamWriterError> { - let map = STREAM_WRITERS - .read() - .map_err(|_| StreamWriterError::RwPoisoned)?; - - for writer in map.values() { - if let Some(mut streamwriter) = writer - .lock() - .map_err(|_| StreamWriterError::MutexPoisoned)? - .take() - { - let _ = streamwriter.finish(); - } - } - - Ok(()) - } -} - -fn init_new_stream_writer_file( - stream_name: &str, - record: &RecordBatch, -) -> Result, StreamWriterError> { - let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(); - - std::fs::create_dir_all(dir.data_path)?; - - let file = OpenOptions::new().create(true).append(true).open(path)?; - - let mut stream_writer = StreamWriter::try_new(file, &record.schema()) - .expect("File and RecordBatch both are checked"); - - stream_writer - .write(record) - .map_err(StreamWriterError::Writer)?; - - Ok(stream_writer) -} - #[derive(Clone)] pub struct Event { pub body: Value, pub stream_name: String, + pub schema_key: String, } // Events holds the schema related to a each event for a single log stream impl Event { pub async fn process(self) -> Result<(), EventError> { - let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; + let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name, &self.schema_key)?; if let Some(schema) = stream_schema { let schema_ref = Arc::new(schema); // validate schema before processing the event - let Ok(mut event) = self.get_record(schema_ref.clone()) else { + let Ok(mut event) = self.get_record(Arc::clone(&schema_ref)) else { return Err(EventError::SchemaMismatch); }; @@ -246,11 +125,12 @@ impl Event { // - map always have an entry for this stream let stream_name = &self.stream_name; + let schema_key = &self.schema_key; let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT); // if the metadata is not none after acquiring lock // then some other thread has already completed this function. - if _schema_with_map(stream_name, &stream_metadata).is_some() { + if _schema_with_map(stream_name, schema_key, &stream_metadata).is_some() { // drop the lock drop(stream_metadata); // Try to post event usual way @@ -265,7 +145,15 @@ impl Event { self.process_event(event)?; log::info!("schema is set in memory map for logstream {}", stream_name); - _set_schema_with_map(stream_name, schema.clone(), &mut stream_metadata); + _set_schema_with_map(stream_name, schema_key, schema, &mut stream_metadata); + + let schema_map = serde_json::to_string( + &stream_metadata + .get(stream_name) + .expect("map has entry for this stream name") + .schema, + ) + .expect("map of schemas is serializable"); // drop mutex before going across await point drop(stream_metadata); @@ -277,7 +165,7 @@ impl Event { let stream_name = stream_name.clone(); spawn(async move { - if let Err(e) = storage.put_schema(&stream_name, &schema).await { + if let Err(e) = storage.put_schema_map(&stream_name, &schema_map).await { // If this call has failed then currently there is no right way to make local state consistent // this needs a fix after more constraints are safety guarentee is provided by localwriter and objectstore_sync. // Reasoning - @@ -299,7 +187,7 @@ impl Event { // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event(&self, rb: &RecordBatch) -> Result<(), EventError> { - STREAM_WRITERS::append_to_local(&self.stream_name, rb)?; + STREAM_WRITERS::append_to_local(&self.stream_name, &self.schema_key, rb)?; Ok(()) } @@ -321,6 +209,17 @@ impl Event { } } +pub fn get_schema_key(body: &Value) -> String { + let mut list_of_fields: Vec<_> = body.as_object().unwrap().keys().collect(); + list_of_fields.sort(); + let mut hasher = md5::Md5::new(); + for field in list_of_fields { + hasher.update(field.as_bytes()) + } + + hex::encode(hasher.finalize()) +} + fn fields_mismatch(schema: &Schema, body: &Value) -> bool { for (name, val) in body.as_object().expect("body is of object variant") { let Ok(field) = schema.field_with_name(name) else { return true }; @@ -381,25 +280,28 @@ fn get_datetime_field(json: &Value) -> Option<&str> { #[inline] pub fn _schema_with_map( stream_name: &str, + schema_key: &str, map: &impl Deref>, ) -> Option { map.get(stream_name) .expect("map has entry for this stream name") .schema - .to_owned() + .get(schema_key) + .cloned() } #[inline] // Special functions which writes to metadata map while holding the lock pub fn _set_schema_with_map( stream_name: &str, + schema_key: &str, schema: Schema, map: &mut impl DerefMut>, ) { map.get_mut(stream_name) .expect("map has entry for this stream name") .schema - .replace(schema); + .insert(schema_key.to_string(), schema); } pub mod error { @@ -407,6 +309,8 @@ pub mod error { use crate::storage::ObjectStorageError; use datafusion::arrow::error::ArrowError; + use super::writer::errors::StreamWriterError; + #[derive(Debug, thiserror::Error)] pub enum EventError { #[error("Missing Record from event body")] @@ -422,18 +326,6 @@ pub mod error { #[error("Schema Mismatch: {0}")] ObjectStorage(#[from] ObjectStorageError), } - - #[derive(Debug, thiserror::Error)] - pub enum StreamWriterError { - #[error("Arrow writer failed: {0}")] - Writer(#[from] ArrowError), - #[error("Io Error when creating new file: {0}")] - Io(#[from] std::io::Error), - #[error("RwLock was poisoned")] - RwPoisoned, - #[error("Mutex was poisoned")] - MutexPoisoned, - } } #[cfg(test)] diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs new file mode 100644 index 000000000..c69832207 --- /dev/null +++ b/server/src/event/writer.rs @@ -0,0 +1,204 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + +use datafusion::arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch}; +use lazy_static::lazy_static; +use std::borrow::Borrow; +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::sync::{Mutex, RwLock}; + +use crate::storage::StorageDir; + +use self::errors::StreamWriterError; + +type ArrowWriter = StreamWriter; +type LocalWriter = Mutex>>; + +lazy_static! { + #[derive(Default)] + pub static ref STREAM_WRITERS: RwLock> = RwLock::new(WriterTable::new()); +} + +impl STREAM_WRITERS { + // append to a existing stream + pub fn append_to_local( + stream: &str, + schema_key: &str, + record: &RecordBatch, + ) -> Result<(), StreamWriterError> { + let hashmap_guard = STREAM_WRITERS + .read() + .map_err(|_| StreamWriterError::RwPoisoned)?; + + match hashmap_guard.get(stream, schema_key) { + Some(localwriter) => { + let mut writer_guard = localwriter + .lock() + .map_err(|_| StreamWriterError::MutexPoisoned)?; + + // if it's some writer then we write without dropping any lock + // hashmap cannot be brought mutably at any point until this finishes + if let Some(ref mut writer) = *writer_guard { + writer.write(record).map_err(StreamWriterError::Writer)?; + } else { + // pass on this mutex to set entry so that it can be reused + // we have a guard for underlying entry thus + // hashmap must not be availible as mutable to any other thread + let writer = init_new_stream_writer_file(stream, schema_key, record)?; + writer_guard.replace(writer); // replace the stream writer behind this mutex + } + } + // entry is not present thus we create it + None => { + // this requires mutable borrow of the map so we drop this read lock and wait for write lock + drop(hashmap_guard); + STREAM_WRITERS::create_entry(stream.to_owned(), schema_key.to_owned(), record)?; + } + }; + Ok(()) + } + + // create a new entry with new stream_writer + // Only create entry for valid streams + fn create_entry( + stream: String, + schema_key: String, + record: &RecordBatch, + ) -> Result<(), StreamWriterError> { + let mut hashmap_guard = STREAM_WRITERS + .write() + .map_err(|_| StreamWriterError::RwPoisoned)?; + + let writer = init_new_stream_writer_file(&stream, &schema_key, record)?; + + hashmap_guard.insert(stream, schema_key, Mutex::new(Some(writer))); + + Ok(()) + } + + pub fn delete_stream(stream: &str) { + STREAM_WRITERS.write().unwrap().delete_stream(stream); + } + + pub fn unset_all() -> Result<(), StreamWriterError> { + let table = STREAM_WRITERS + .read() + .map_err(|_| StreamWriterError::RwPoisoned)?; + + for writer in table.iter() { + if let Some(mut streamwriter) = writer + .lock() + .map_err(|_| StreamWriterError::MutexPoisoned)? + .take() + { + let _ = streamwriter.finish(); + } + } + + Ok(()) + } +} + +pub struct WriterTable +where + A: Eq + std::hash::Hash, + B: Eq + std::hash::Hash, + T: Write, +{ + table: HashMap>>, +} + +impl WriterTable +where + A: Eq + std::hash::Hash, + B: Eq + std::hash::Hash, + T: Write, +{ + pub fn new() -> Self { + let table = HashMap::new(); + Self { table } + } + + fn get(&self, a: &X, b: &Y) -> Option<&LocalWriter> + where + A: Borrow, + B: Borrow, + X: Eq + std::hash::Hash + ?Sized, + Y: Eq + std::hash::Hash + ?Sized, + { + self.table.get(a)?.get(b) + } + + fn insert(&mut self, a: A, b: B, v: LocalWriter) { + let inner = self.table.entry(a).or_default(); + inner.insert(b, v); + } + + pub fn delete_stream(&mut self, stream: &X) + where + A: Borrow, + X: Eq + std::hash::Hash + ?Sized, + { + self.table.remove(stream); + } + + fn iter(&self) -> impl Iterator> { + self.table.values().flat_map(|inner| inner.values()) + } +} + +fn init_new_stream_writer_file( + stream_name: &str, + schema_key: &str, + record: &RecordBatch, +) -> Result, StreamWriterError> { + let dir = StorageDir::new(stream_name); + let path = dir.path_by_current_time(schema_key); + + std::fs::create_dir_all(dir.data_path)?; + + let file = OpenOptions::new().create(true).append(true).open(path)?; + + let mut stream_writer = StreamWriter::try_new(file, &record.schema()) + .expect("File and RecordBatch both are checked"); + + stream_writer + .write(record) + .map_err(StreamWriterError::Writer)?; + + Ok(stream_writer) +} + +pub mod errors { + use arrow_schema::ArrowError; + + #[derive(Debug, thiserror::Error)] + pub enum StreamWriterError { + #[error("Arrow writer failed: {0}")] + Writer(#[from] ArrowError), + #[error("Io Error when creating new file: {0}")] + Io(#[from] std::io::Error), + #[error("RwLock was poisoned")] + RwPoisoned, + #[error("Mutex was poisoned")] + MutexPoisoned, + } +} diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index af85d6f60..74eb27e50 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -103,10 +103,12 @@ async fn push_logs( for mut body in array { merge(&mut body, tags_n_metadata.clone().into_iter()); let body = flatten_json_body(&body).unwrap(); + let schema_key = event::get_schema_key(&body); let event = event::Event { body, stream_name: stream_name.clone(), + schema_key: schema_key, }; event.process().await?; @@ -114,10 +116,12 @@ async fn push_logs( } mut body @ Value::Object(_) => { merge(&mut body, tags_n_metadata.into_iter()); - + let body = flatten_json_body(&body).unwrap(); + let schema_key = event::get_schema_key(&body); let event = event::Event { - body: flatten_json_body(&body).unwrap(), + body, stream_name, + schema_key, }; event.process().await?; diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index aad2de257..16e9af9bf 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -25,8 +25,9 @@ use serde_json::Value; use crate::alerts::Alerts; use crate::event; +use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::{ObjectStorageError, StorageDir}; +use crate::storage::StorageDir; use crate::{metadata, validator}; use self::error::StreamError; @@ -37,19 +38,13 @@ pub async fn delete(req: HttpRequest) -> Result { let objectstore = CONFIG.storage().get_object_store(); - if objectstore.get_schema(&stream_name).await.is_err() { + if !objectstore.stream_exists(&stream_name).await? { return Err(StreamError::StreamNotFound(stream_name.to_string())); } objectstore.delete_stream(&stream_name).await?; metadata::STREAM_INFO.delete_stream(&stream_name); - - if event::STREAM_WRITERS::delete_entry(&stream_name).is_err() { - log::warn!( - "failed to delete log stream event writers for stream {}", - stream_name - ) - } + event::STREAM_WRITERS::delete_stream(&stream_name); let stream_dir = StorageDir::new(&stream_name); if fs::remove_dir_all(&stream_dir.data_path).is_err() { @@ -76,20 +71,11 @@ pub async fn list(_: HttpRequest) -> impl Responder { pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - match metadata::STREAM_INFO.schema(&stream_name) { - Ok(schema) => Ok((web::Json(schema), StatusCode::OK)), - Err(_) => match CONFIG - .storage() - .get_object_store() - .get_schema(&stream_name) - .await - { - Ok(Some(schema)) => Ok((web::Json(Some(schema)), StatusCode::OK)), - Ok(None) => Err(StreamError::UninitializedLogstream), - Err(ObjectStorageError::NoSuchKey(_)) => Err(StreamError::StreamNotFound(stream_name)), - Err(err) => Err(err.into()), - }, - } + let schemas = STREAM_INFO + .schema_map(&stream_name) + .map_err(|_| StreamError::StreamNotFound(stream_name.to_owned()))?; + + Ok((web::Json(schemas), StatusCode::OK)) } pub async fn get_alert(req: HttpRequest) -> Result { @@ -164,19 +150,21 @@ pub async fn put_alert( validator::alert(&alerts)?; - match metadata::STREAM_INFO.schema(&stream_name) { - Ok(Some(schema)) => { - let invalid_alert = alerts - .alerts - .iter() - .find(|alert| !alert.rule.valid_for_schema(&schema)); + let schemas = STREAM_INFO + .schema_map(&stream_name) + .map_err(|_| StreamError::StreamNotFound(stream_name.to_owned()))?; - if let Some(alert) = invalid_alert { - return Err(StreamError::InvalidAlert(alert.name.to_string())); - } + if schemas.len() == 0 { + return Err(StreamError::UninitializedLogstream); + } + + for alert in &alerts.alerts { + if !schemas + .values() + .any(|schema| alert.rule.valid_for_schema(schema)) + { + return Err(StreamError::InvalidAlert(alert.name.to_owned())); } - Ok(None) => return Err(StreamError::UninitializedLogstream), - Err(_) => return Err(StreamError::StreamNotFound(stream_name)), } CONFIG @@ -255,7 +243,7 @@ pub async fn create_stream(stream_name: String) -> Result<(), StreamError> { status: StatusCode::INTERNAL_SERVER_ERROR, }); } - metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); + metadata::STREAM_INFO.add_stream(stream_name.to_string()); Ok(()) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 075755bbd..dfef749b6 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -28,13 +28,7 @@ use crate::storage::ObjectStorage; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; -#[derive(Debug, Default)] -pub struct LogStreamMetadata { - pub schema: Option, - pub alerts: Alerts, - pub stats: StatsCounter, -} - +// TODO: make return type be of 'static lifetime instead of cloning lazy_static! { #[derive(Debug)] // A read-write lock to allow multiple reads while and isolated write @@ -42,6 +36,13 @@ lazy_static! { RwLock::new(HashMap::new()); } +#[derive(Debug, Default)] +pub struct LogStreamMetadata { + pub schema: HashMap, + pub alerts: Alerts, + pub stats: StatsCounter, +} + // It is very unlikely that panic will occur when dealing with metadata. pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding a lock"; @@ -68,26 +69,33 @@ impl STREAM_INFO { Ok(()) } - #[allow(dead_code)] - pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); - map.get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| { - metadata.schema.replace(schema); - }) - } - pub fn stream_exists(&self, stream_name: &str) -> bool { let map = self.read().expect(LOCK_EXPECT); map.contains_key(stream_name) } - pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { + pub fn schema( + &self, + stream_name: &str, + schema_key: &str, + ) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); - map.get(stream_name) + let schemas = map + .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.schema.to_owned()) + .map(|metadata| &metadata.schema)?; + + Ok(schemas.get(schema_key).cloned()) + } + + pub fn schema_map(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + let schemas = map + .get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.schema.clone())?; + + Ok(schemas) } pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { @@ -99,11 +107,9 @@ impl STREAM_INFO { }) } - pub fn add_stream(&self, stream_name: String, schema: Option, alerts: Alerts) { + pub fn add_stream(&self, stream_name: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { - schema, - alerts, ..Default::default() }; map.insert(stream_name, metadata); @@ -122,7 +128,7 @@ impl STREAM_INFO { for stream in storage.list_streams().await? { let alerts = storage.get_alerts(&stream.name).await?; - let schema = storage.get_schema(&stream.name).await?; + let schema = storage.get_schema_map(&stream.name).await?; let stats = storage.get_stats(&stream.name).await?; let metadata = LogStreamMetadata { diff --git a/server/src/query.rs b/server/src/query.rs index b6c5ae20b..3e09e695c 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -174,7 +174,7 @@ pub mod error { #[cfg(test)] mod tests { use super::{time_from_path, Query}; - use crate::{alerts::Alerts, metadata::STREAM_INFO}; + use crate::metadata::STREAM_INFO; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::datatypes::{DataType, Field}; use rstest::*; @@ -222,7 +222,7 @@ mod tests { #[serial_test::serial] fn query_parse_prefix_with_some_schema(#[case] prefix: &str, #[case] right: &[&str]) { clear_map(); - STREAM_INFO.add_stream("stream_name".to_string(), Some(schema()), Alerts::default()); + STREAM_INFO.add_stream("stream_name".to_string()); let query = Value::from_str(prefix).unwrap(); let query = Query::parse(query).unwrap(); @@ -244,7 +244,7 @@ mod tests { #[serial_test::serial] fn query_parse_prefix_with_no_schema(#[case] prefix: &str) { clear_map(); - STREAM_INFO.add_stream("stream_name".to_string(), None, Alerts::default()); + STREAM_INFO.add_stream("stream_name".to_string()); let query = Value::from_str(prefix).unwrap(); assert!(Query::parse(query).is_err()); diff --git a/server/src/storage.rs b/server/src/storage.rs index ae6da4ed3..bbf085d0f 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -205,7 +205,7 @@ impl StorageDir { Self { data_path } } - fn filename_by_time(time: NaiveDateTime) -> String { + fn file_time_suffix(time: NaiveDateTime) -> String { let uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour()) + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); @@ -214,13 +214,18 @@ impl StorageDir { format!("{local_uri}{hostname}.data.arrows") } - fn filename_by_current_time() -> String { + fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { + format!("{}.{}", stream_hash, Self::file_time_suffix(time)) + } + + fn filename_by_current_time(stream_hash: &str) -> String { let datetime = Utc::now(); - Self::filename_by_time(datetime.naive_utc()) + Self::filename_by_time(stream_hash, datetime.naive_utc()) } - pub fn path_by_current_time(&self) -> PathBuf { - self.data_path.join(Self::filename_by_current_time()) + pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { + self.data_path + .join(Self::filename_by_current_time(stream_hash)) } pub fn arrow_files(&self) -> Vec { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index c946ec302..887ba656d 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -69,13 +69,16 @@ pub trait ObjectStorage: Sync + 'static { async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; fn query_table(&self, query: &Query) -> Result, ObjectStorageError>; - async fn put_schema( + async fn put_schema_map( &self, stream_name: &str, - schema: &Schema, + schema_map: &str, ) -> Result<(), ObjectStorageError> { - self.put_object(&schema_path(stream_name), to_bytes(schema)) - .await?; + self.put_object( + &schema_path(stream_name), + Bytes::copy_from_slice(schema_map.as_bytes()), + ) + .await?; Ok(()) } @@ -88,8 +91,12 @@ pub trait ObjectStorage: Sync + 'static { let format_json = to_bytes(&format); - self.put_object(&schema_path(stream_name), "".into()) - .await?; + self.put_object( + &schema_path(stream_name), + to_bytes(&HashMap::::new()), + ) + .await?; + self.put_object(&stream_json_path(stream_name), format_json) .await?; @@ -125,9 +132,12 @@ pub trait ObjectStorage: Sync + 'static { .await } - async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError> { + async fn get_schema_map( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { let schema = self.get_object(&schema_path(stream_name)).await?; - let schema = serde_json::from_slice(&schema).ok(); + let schema = serde_json::from_slice(&schema).expect("schema map is valid json"); Ok(schema) } @@ -171,6 +181,16 @@ pub trait ObjectStorage: Sync + 'static { Ok(parseable_metadata) } + async fn stream_exists(&self, stream_name: &str) -> Result { + let res = self.get_object(&stream_json_path(stream_name)).await; + + match res { + Ok(_) => Ok(true), + Err(ObjectStorageError::NoSuchKey(_)) => Ok(false), + Err(e) => Err(e), + } + } + async fn sync(&self) -> Result<(), MoveDataError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); @@ -187,14 +207,16 @@ pub trait ObjectStorage: Sync + 'static { let mut arrow_files = dir.arrow_files(); // Do not include file which is being written to - let hot_file = dir.path_by_current_time(); - let hot_filename = hot_file.file_name().expect("is a not none filename"); + let time = chrono::Utc::now().naive_utc(); + let hot_filename = StorageDir::file_time_suffix(time); arrow_files.retain(|file| { !file .file_name() .expect("is a not none filename") - .eq(hot_filename) + .to_str() + .unwrap() + .ends_with(&hot_filename) }); for file in arrow_files { diff --git a/server/src/validator.rs b/server/src/validator.rs index 81ebb80a9..4d958b365 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -16,8 +16,6 @@ * */ -use std::sync::Arc; - use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; @@ -160,18 +158,21 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result Arc::new(schema), - None => return Err(QueryValidationError::UninitializedStream), - }; - - Ok(Query { - stream_name: tokens[stream_name_index].to_string(), - start, - end, - query: query.to_string(), - schema, - }) + let schemas = STREAM_INFO.schema_map(&stream_name)?; + + if schemas.len() == 0 { + return Err(QueryValidationError::UninitializedStream); + } + + // Ok(Query { + // stream_name: tokens[stream_name_index].to_string(), + // start, + // end, + // query: query.to_string(), + // schema, + // }); + + todo!(); } pub mod error { From e6b0ad8e42f952abd53169c9c76906694ded71f3 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 27 Jan 2023 13:40:07 +0530 Subject: [PATCH 02/22] Fix Query --- server/src/query.rs | 100 +++++++++------------------------- server/src/storage/localfs.rs | 2 +- server/src/storage/s3.rs | 2 +- server/src/utils.rs | 47 +++++++--------- server/src/validator.rs | 16 +++--- 5 files changed, 57 insertions(+), 110 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 3e09e695c..fe738f7e5 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -25,7 +25,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; use datafusion::prelude::*; use serde_json::Value; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -49,7 +49,7 @@ fn get_value(value: &Value, key: Key) -> Result<&str, Key> { pub struct Query { pub query: String, pub stream_name: String, - pub schema: Arc, + pub schemas: HashMap, pub start: DateTime, pub end: DateTime, } @@ -67,9 +67,29 @@ impl Query { } /// Return prefixes, each per day/hour/minutes as necessary + fn _get_prefixes(&self) -> Vec { + TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY).generate_prefixes() + } + pub fn get_prefixes(&self) -> Vec { - TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY) - .generate_prefixes(&self.stream_name) + let datetime_prefixes = self._get_prefixes(); + let mut res = Vec::new(); + + for schema_key in self.schemas.keys() { + let prefix = format!("{}/{}", self.stream_name, schema_key); + + res.extend( + datetime_prefixes + .iter() + .map(|datetime_prefix| format!("{}/{}", prefix, datetime_prefix)), + ) + } + + res + } + + pub fn get_schema(&self) -> Schema { + Schema::try_merge(self.schemas.clone().into_values()).expect("mergable shcemas") } /// Execute query on object storage(and if necessary on cache as well) with given stream information @@ -109,8 +129,9 @@ impl Query { arrow_files, parquet_files, storage.query_table(self)?, - Arc::clone(&self.schema), + Arc::new(self.get_schema()), )); + ctx.register_table( &*self.stream_name, Arc::clone(&table) as Arc, @@ -173,14 +194,8 @@ pub mod error { #[cfg(test)] mod tests { - use super::{time_from_path, Query}; - use crate::metadata::STREAM_INFO; - use datafusion::arrow::datatypes::Schema; - use datafusion::arrow::datatypes::{DataType, Field}; - use rstest::*; - use serde_json::Value; + use super::time_from_path; use std::path::PathBuf; - use std::str::FromStr; #[test] fn test_time_from_parquet_path() { @@ -188,65 +203,4 @@ mod tests { let time = time_from_path(path.as_path()); assert_eq!(time.timestamp(), 1640995200); } - - // Query prefix generation tests - #[fixture] - fn schema() -> Schema { - let field_a = Field::new("a", DataType::Int64, false); - let field_b = Field::new("b", DataType::Boolean, false); - Schema::new(vec![field_a, field_b]) - } - - fn clear_map() { - STREAM_INFO.write().unwrap().clear(); - } - - // A query can only be performed on streams with a valid schema - #[rstest] - #[case( - r#"{ - "query": "SELECT * FROM stream_name", - "startTime": "2022-10-15T10:00:00+00:00", - "endTime": "2022-10-15T10:01:00+00:00" - }"#, - &["stream_name/date=2022-10-15/hour=10/minute=00/"] - )] - #[case( - r#"{ - "query": "SELECT * FROM stream_name", - "startTime": "2022-10-15T10:00:00+00:00", - "endTime": "2022-10-15T10:02:00+00:00" - }"#, - &["stream_name/date=2022-10-15/hour=10/minute=00/", "stream_name/date=2022-10-15/hour=10/minute=01/"] - )] - #[serial_test::serial] - fn query_parse_prefix_with_some_schema(#[case] prefix: &str, #[case] right: &[&str]) { - clear_map(); - STREAM_INFO.add_stream("stream_name".to_string()); - - let query = Value::from_str(prefix).unwrap(); - let query = Query::parse(query).unwrap(); - assert_eq!(&query.stream_name, "stream_name"); - let prefixes = query.get_prefixes(); - let left = prefixes.iter().map(String::as_str).collect::>(); - assert_eq!(left.as_slice(), right); - } - - // If there is no schema for this stream then parsing a Query should fail - #[rstest] - #[case( - r#"{ - "query": "SELECT * FROM stream_name", - "startTime": "2022-10-15T10:00:00+00:00", - "endTime": "2022-10-15T10:01:00+00:00" - }"# - )] - #[serial_test::serial] - fn query_parse_prefix_with_no_schema(#[case] prefix: &str) { - clear_map(); - STREAM_INFO.add_stream("stream_name".to_string()); - - let query = Value::from_str(prefix).unwrap(); - assert!(Query::parse(query).is_err()); - } } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 277db8601..9eca3ddfd 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -191,7 +191,7 @@ impl ObjectStorage for LocalFS { let config = ListingTableConfig::new_with_multi_paths(prefixes) .with_listing_options(listing_options) - .with_schema(Arc::clone(&query.schema)); + .with_schema(Arc::new(query.get_schema())); Ok(Some(ListingTable::try_new(config)?)) } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 1ba6adc1e..561d88109 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -345,7 +345,7 @@ impl ObjectStorage for S3 { let config = ListingTableConfig::new_with_multi_paths(prefixes) .with_listing_options(listing_options) - .with_schema(Arc::clone(&query.schema)); + .with_schema(Arc::new(query.get_schema())); Ok(Some(ListingTable::try_new(config)?)) } diff --git a/server/src/utils.rs b/server/src/utils.rs index c49ba61a2..5eef313f7 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -271,13 +271,9 @@ impl TimePeriod { } } - pub fn generate_prefixes(&self, prefix: &str) -> Vec { - let prefix = format!("{prefix}/"); - + pub fn generate_prefixes(&self) -> Vec { let end_minute = self.end.minute() + u32::from(self.end.second() > 0); - self.generate_date_prefixes( - &prefix, self.start.date_naive(), self.end.date_naive(), (self.start.hour(), self.start.minute()), @@ -371,7 +367,6 @@ impl TimePeriod { pub fn generate_date_prefixes( &self, - prefix: &str, start_date: NaiveDate, end_date: NaiveDate, start_time: (u32, u32), @@ -381,7 +376,7 @@ impl TimePeriod { let mut date = start_date; while date <= end_date { - let prefix = prefix.to_owned() + &date_to_prefix(date); + let prefix = date_to_prefix(date); let is_start = date == start_date; let is_end = date == end_date; @@ -426,66 +421,66 @@ mod tests { #[rstest] #[case::same_minute( "2022-06-11T16:30:00+00:00", "2022-06-11T16:30:59+00:00", - &["stream_name/date=2022-06-11/hour=16/minute=30/"] + &["date=2022-06-11/hour=16/minute=30/"] )] #[case::same_hour_different_minute( "2022-06-11T16:57:00+00:00", "2022-06-11T16:59:00+00:00", &[ - "stream_name/date=2022-06-11/hour=16/minute=57/", - "stream_name/date=2022-06-11/hour=16/minute=58/" + "date=2022-06-11/hour=16/minute=57/", + "date=2022-06-11/hour=16/minute=58/" ] )] #[case::same_hour_with_00_to_59_minute_block( "2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00", - &["stream_name/date=2022-06-11/hour=16/"] + &["date=2022-06-11/hour=16/"] )] #[case::same_date_different_hours_coherent_minute( "2022-06-11T15:00:00+00:00", "2022-06-11T17:00:00+00:00", &[ - "stream_name/date=2022-06-11/hour=15/", - "stream_name/date=2022-06-11/hour=16/" + "date=2022-06-11/hour=15/", + "date=2022-06-11/hour=16/" ] )] #[case::same_date_different_hours_incoherent_minutes( "2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00", &[ - "stream_name/date=2022-06-11/hour=15/minute=59/", - "stream_name/date=2022-06-11/hour=16/minute=00/" + "date=2022-06-11/hour=15/minute=59/", + "date=2022-06-11/hour=16/minute=00/" ] )] #[case::same_date_different_hours_whole_hours_between_incoherent_minutes( "2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00", &[ - "stream_name/date=2022-06-11/hour=15/minute=59/", - "stream_name/date=2022-06-11/hour=16/", - "stream_name/date=2022-06-11/hour=17/minute=00/" + "date=2022-06-11/hour=15/minute=59/", + "date=2022-06-11/hour=16/", + "date=2022-06-11/hour=17/minute=00/" ] )] #[case::different_date_coherent_hours_and_minutes( "2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00", &[ - "stream_name/date=2022-06-11/", - "stream_name/date=2022-06-12/" + "date=2022-06-11/", + "date=2022-06-12/" ] )] #[case::different_date_incoherent_hours_coherent_minutes( "2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00", &[ - "stream_name/date=2022-06-11/hour=23/", - "stream_name/date=2022-06-12/hour=00/", - "stream_name/date=2022-06-12/hour=01/" + "date=2022-06-11/hour=23/", + "date=2022-06-12/hour=00/", + "date=2022-06-12/hour=01/" ] )] #[case::different_date_incoherent_hours_incoherent_minutes( "2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00", &[ - "stream_name/date=2022-06-11/hour=23/minute=59/", - "stream_name/date=2022-06-12/hour=00/minute=00/" + "date=2022-06-11/hour=23/minute=59/", + "date=2022-06-12/hour=00/minute=00/" ] )] fn prefix_generation(#[case] start: &str, #[case] end: &str, #[case] right: &[&str]) { let time_period = time_period_from_str(start, end); - let prefixes = time_period.generate_prefixes("stream_name"); + let prefixes = time_period.generate_prefixes(); let left = prefixes.iter().map(String::as_str).collect::>(); assert_eq!(left.as_slice(), right); } diff --git a/server/src/validator.rs b/server/src/validator.rs index 4d958b365..f791b79d6 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -164,15 +164,13 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result Date: Sat, 28 Jan 2023 11:45:30 +0530 Subject: [PATCH 03/22] Fix time parse --- server/src/query.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index fe738f7e5..5dbf5c84e 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -156,10 +156,22 @@ fn time_from_path(path: &Path) -> DateTime { .to_str() .expect("filename is valid"); - // substring of filename i.e date=xxxx.hour=xx.minute=xx - let prefix = &prefix[..33]; - Utc.datetime_from_str(prefix, "date=%F.hour=%H.minute=%M") - .expect("valid prefix is parsed") + // split by . and skip first part because that is schema key. + // Next three in order will be date, hour and minute + let mut components = prefix.splitn(4, '.').skip(1); + + let date = components.next().expect("date=xxxx-xx-xx"); + let hour = components.next().expect("hour=xx"); + let minute = components.next().expect("minute=xx"); + + let year = date[5..9].parse().unwrap(); + let month = date[10..12].parse().unwrap(); + let day = date[13..15].parse().unwrap(); + let hour = hour[5..7].parse().unwrap(); + let minute = minute[7..9].parse().unwrap(); + + Utc.with_ymd_and_hms(year, month, day, hour, minute, 0) + .unwrap() } pub mod error { @@ -199,7 +211,9 @@ mod tests { #[test] fn test_time_from_parquet_path() { - let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); + let path = PathBuf::from( + "schema_key_rlength.date=2022-01-01.hour=00.minute=00.hostname.data.parquet", + ); let time = time_from_path(path.as_path()); assert_eq!(time.timestamp(), 1640995200); } From ebe63f9e2184157ff8c3effe35dc7f4cbda09271 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 4 Feb 2023 15:05:19 +0530 Subject: [PATCH 04/22] Refactor --- server/Cargo.toml | 2 +- server/src/event.rs | 169 ++++++++++++--------------- server/src/metadata.rs | 11 +- server/src/query.rs | 5 +- server/src/storage/object_storage.rs | 2 +- 5 files changed, 88 insertions(+), 101 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index d7326a70e..60c2630fa 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -49,7 +49,7 @@ rustls = "0.20.6" rustls-pemfile = "1.0.1" rust-flatten-json = "0.2.0" semver = "1.0.14" -serde = "^1.0.8" +serde = { version = "1.0", features = ["rc"] } serde_derive = "^1.0.8" serde_json = "^1.0.8" thiserror = "1" diff --git a/server/src/event.rs b/server/src/event.rs index 9f7404bf5..2cf5bc8a0 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -18,7 +18,6 @@ mod writer; * * */ -use actix_web::rt::spawn; use arrow_schema::{DataType, Field, TimeUnit}; use chrono::{DateTime, Utc}; use datafusion::arrow::array::{Array, TimestampMillisecondArray}; @@ -30,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch; use md5::Digest; use serde_json::Value; use std::collections::HashMap; -use std::ops::{Deref, DerefMut}; +use std::ops::DerefMut; use std::sync::Arc; use crate::metadata; @@ -55,9 +54,8 @@ impl Event { pub async fn process(self) -> Result<(), EventError> { let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name, &self.schema_key)?; if let Some(schema) = stream_schema { - let schema_ref = Arc::new(schema); // validate schema before processing the event - let Ok(mut event) = self.get_record(Arc::clone(&schema_ref)) else { + let Ok(mut event) = self.get_record(Arc::clone(&schema)) else { return Err(EventError::SchemaMismatch); }; @@ -68,7 +66,7 @@ impl Event { { let rows = event.num_rows(); let timestamp_array = Arc::new(get_timestamp_array(rows)); - event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); + event = replace(schema, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); } self.process_event(&event)?; @@ -79,14 +77,7 @@ impl Event { let time_field = get_datetime_field(&self.body); if time_field.is_none() { - let schema = Schema::try_merge(vec![ - Schema::new(vec![Field::new( - DEFAULT_TIMESTAMP_KEY, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )]), - self.infer_schema()?, - ])?; + let schema = add_default_timestamp_field(self.infer_schema()?)?; let schema_ref = Arc::new(schema.clone()); let event = self.get_record(schema_ref.clone())?; let timestamp_array = Arc::new(get_timestamp_array(event.num_rows())); @@ -127,61 +118,8 @@ impl Event { let stream_name = &self.stream_name; let schema_key = &self.schema_key; - let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT); - // if the metadata is not none after acquiring lock - // then some other thread has already completed this function. - if _schema_with_map(stream_name, schema_key, &stream_metadata).is_some() { - // drop the lock - drop(stream_metadata); - // Try to post event usual way - log::info!("first event is redirected to process_event"); - self.process_event(event) - } else { - // stream metadata is still none, - // this means this execution should be considered as first event. - - // Store record batch on local cache - log::info!("creating local writer for this first event"); - self.process_event(event)?; - - log::info!("schema is set in memory map for logstream {}", stream_name); - _set_schema_with_map(stream_name, schema_key, schema, &mut stream_metadata); - - let schema_map = serde_json::to_string( - &stream_metadata - .get(stream_name) - .expect("map has entry for this stream name") - .schema, - ) - .expect("map of schemas is serializable"); - // drop mutex before going across await point - drop(stream_metadata); - - log::info!( - "setting schema on objectstore for logstream {}", - stream_name - ); - let storage = CONFIG.storage().get_object_store(); - - let stream_name = stream_name.clone(); - spawn(async move { - if let Err(e) = storage.put_schema_map(&stream_name, &schema_map).await { - // If this call has failed then currently there is no right way to make local state consistent - // this needs a fix after more constraints are safety guarentee is provided by localwriter and objectstore_sync. - // Reasoning - - // - After dropping lock many events may process through - // - Processed events may sync before metadata deletion - log::error!( - "Parseable failed to upload schema to objectstore due to error {}", - e - ); - log::error!("Please manually delete this logstream and create a new one."); - metadata::STREAM_INFO.delete_stream(&stream_name); - } - }); - - Ok(()) - } + commit_schema(stream_name, schema_key, Arc::new(schema))?; + self.process_event(event) } // event process all events after the 1st event. Concatenates record batches @@ -209,6 +147,19 @@ impl Event { } } +fn add_default_timestamp_field(schema: Schema) -> Result { + let schema = Schema::try_merge(vec![ + Schema::new(vec![Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )]), + schema, + ])?; + + Ok(schema) +} + pub fn get_schema_key(body: &Value) -> String { let mut list_of_fields: Vec<_> = body.as_object().unwrap().keys().collect(); list_of_fields.sort(); @@ -244,6 +195,45 @@ fn fields_mismatch(schema: &Schema, body: &Value) -> bool { false } +fn commit_schema( + stream_name: &str, + schema_key: &str, + schema: Arc, +) -> Result<(), EventError> { + // note for methods .get_unchecked and .set_unchecked, + // these are to be called while holding a write lock specifically. + // this guarantees two things + // - no other metadata operation can happen in between + // - map always have an entry for this stream + + let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT); + // if the metadata is not none after acquiring lock + // then some other thread has already completed this function. + if stream_metadata + .get_unchecked(stream_name, schema_key) + .is_some() + { + // drop the lock + drop(stream_metadata); + // Nothing to do + return Ok(()); + } else { + let schema_map = serde_json::to_string( + &stream_metadata + .get(stream_name) + .expect("map has entry for this stream name") + .schema, + ) + .expect("map of schemas is serializable"); + + let storage = CONFIG.storage().get_object_store(); + futures::executor::block_on(storage.put_schema_map(&stream_name, &schema_map))?; + stream_metadata.set_unchecked(stream_name, schema_key, schema); + } + + Ok(()) +} + fn replace( schema: Arc, batch: RecordBatch, @@ -276,34 +266,27 @@ fn get_datetime_field(json: &Value) -> Option<&str> { None } -// Special functions which reads from metadata map while holding the lock -#[inline] -pub fn _schema_with_map( - stream_name: &str, - schema_key: &str, - map: &impl Deref>, -) -> Option { - map.get(stream_name) - .expect("map has entry for this stream name") - .schema - .get(schema_key) - .cloned() -} +trait UncheckedOp: DerefMut> { + fn get_unchecked(&self, stream_name: &str, schema_key: &str) -> Option> { + self.get(stream_name) + .expect("map has entry for this stream name") + .schema + .get(schema_key) + .cloned() + } -#[inline] -// Special functions which writes to metadata map while holding the lock -pub fn _set_schema_with_map( - stream_name: &str, - schema_key: &str, - schema: Schema, - map: &mut impl DerefMut>, -) { - map.get_mut(stream_name) - .expect("map has entry for this stream name") - .schema - .insert(schema_key.to_string(), schema); + fn set_unchecked(&mut self, stream_name: &str, schema_key: &str, schema: Arc) { + self.get_mut(stream_name) + .expect("map has entry for this stream name") + .schema + .insert(schema_key.to_string(), schema) + .is_some() + .then(|| panic!("collision")); + } } +impl>> UncheckedOp for T {} + pub mod error { use crate::metadata::error::stream_info::MetadataError; use crate::storage::ObjectStorageError; diff --git a/server/src/metadata.rs b/server/src/metadata.rs index dfef749b6..7259f48de 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -19,7 +19,7 @@ use datafusion::arrow::datatypes::Schema; use lazy_static::lazy_static; use std::collections::HashMap; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use crate::alerts::Alerts; use crate::event::Event; @@ -38,7 +38,7 @@ lazy_static! { #[derive(Debug, Default)] pub struct LogStreamMetadata { - pub schema: HashMap, + pub schema: HashMap>, pub alerts: Alerts, pub stats: StatsCounter, } @@ -78,7 +78,7 @@ impl STREAM_INFO { &self, stream_name: &str, schema_key: &str, - ) -> Result, MetadataError> { + ) -> Result>, MetadataError> { let map = self.read().expect(LOCK_EXPECT); let schemas = map .get(stream_name) @@ -88,7 +88,10 @@ impl STREAM_INFO { Ok(schemas.get(schema_key).cloned()) } - pub fn schema_map(&self, stream_name: &str) -> Result, MetadataError> { + pub fn schema_map( + &self, + stream_name: &str, + ) -> Result>, MetadataError> { let map = self.read().expect(LOCK_EXPECT); let schemas = map .get(stream_name) diff --git a/server/src/query.rs b/server/src/query.rs index 5dbf5c84e..89db27d92 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -49,7 +49,7 @@ fn get_value(value: &Value, key: Key) -> Result<&str, Key> { pub struct Query { pub query: String, pub stream_name: String, - pub schemas: HashMap, + pub schemas: HashMap>, pub start: DateTime, pub end: DateTime, } @@ -89,7 +89,8 @@ impl Query { } pub fn get_schema(&self) -> Schema { - Schema::try_merge(self.schemas.clone().into_values()).expect("mergable shcemas") + Schema::try_merge(self.schemas.values().map(|schema| schema.as_ref()).cloned()) + .expect("mergable shcemas") } /// Execute query on object storage(and if necessary on cache as well) with given stream information diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 887ba656d..646e4646a 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -135,7 +135,7 @@ pub trait ObjectStorage: Sync + 'static { async fn get_schema_map( &self, stream_name: &str, - ) -> Result, ObjectStorageError> { + ) -> Result>, ObjectStorageError> { let schema = self.get_object(&schema_path(stream_name)).await?; let schema = serde_json::from_slice(&schema).expect("schema map is valid json"); Ok(schema) From 09956dd4eb7d0f41cbdc8024545095f8b234b194 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 6 Feb 2023 11:51:34 +0530 Subject: [PATCH 05/22] Add MergedRecordReader based on default timestamp --- server/src/storage/object_storage.rs | 115 ++++++++++++++++++++++----- 1 file changed, 96 insertions(+), 19 deletions(-) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 646e4646a..e3bccfcb9 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -26,11 +26,15 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{ - arrow::ipc::reader::StreamReader, + arrow::{ + array::TimestampMillisecondArray, ipc::reader::StreamReader, record_batch::RecordBatch, + }, datasource::listing::ListingTable, execution::runtime_env::RuntimeEnv, parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, }; +use itertools::kmerge_by; +use lazy_static::__Deref; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde::Serialize; @@ -39,7 +43,7 @@ use serde_json::Value; use std::{ collections::HashMap, fs::{self, File}, - path::Path, + path::{Path, PathBuf}, sync::Arc, }; @@ -205,11 +209,12 @@ pub trait ObjectStorage: Sync + 'static { let dir = StorageDir::new(stream); // walk dir, find all .arrows files and convert to parquet - let mut arrow_files = dir.arrow_files(); // Do not include file which is being written to let time = chrono::Utc::now().naive_utc(); let hot_filename = StorageDir::file_time_suffix(time); + let mut arrow_files = dir.arrow_files(); + arrow_files.retain(|file| { !file .file_name() @@ -219,35 +224,48 @@ pub trait ObjectStorage: Sync + 'static { .ends_with(&hot_filename) }); - for file in arrow_files { - let arrow_file = File::open(&file).map_err(|_| MoveDataError::Open)?; - let reader = StreamReader::try_new(arrow_file, None)?; - let schema = reader.schema(); - let records = reader.filter_map(|record| match record { - Ok(record) => Some(record), - Err(e) => { - log::warn!("warning from arrow stream {:?}", e); - None - } - }); + // hashmap time = vec[paths] + let mut grouped_arrow_file: HashMap<&str, Vec<&Path>> = HashMap::new(); + + for file in &arrow_files { + let key = file + .file_name() + .expect("is a not none filename") + .to_str() + .unwrap(); + + let (_, key) = key.split_once('.').unwrap(); + + let key = key.strip_suffix("data.arrows").unwrap(); + + grouped_arrow_file.entry(key).or_default().push(file); + } + + for files in grouped_arrow_file.values() { + let parquet_path = get_parquet_path(files[0]); + let record_reader = MergedRecordReader::try_new(files).unwrap(); - let mut parquet_path = file.clone(); - parquet_path.set_extension("parquet"); let mut parquet_table = CACHED_FILES.lock().unwrap(); let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; parquet_table.upsert(&parquet_path); let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; + let mut writer = ArrowWriter::try_new( + parquet_file, + Arc::new(record_reader.merged_schema()), + Some(props), + )?; - for ref record in records { + for ref record in record_reader.merged_iter() { writer.write(record)?; } writer.close()?; - fs::remove_file(file).map_err(|_| MoveDataError::Delete)?; + for file in files { + let _ = fs::remove_file(file); + } } for file in dir.parquet_files() { @@ -325,6 +343,65 @@ pub trait ObjectStorage: Sync + 'static { } } +fn get_parquet_path(file: &Path) -> PathBuf { + let filename = file.file_name().unwrap().to_str().unwrap(); + let (_, filename) = filename.split_once('.').unwrap(); + + let mut parquet_path = file.to_owned(); + parquet_path.set_file_name(filename); + parquet_path.set_extension("parquet"); + + parquet_path +} + +#[derive(Debug)] +struct MergedRecordReader { + readers: Vec>, +} + +impl MergedRecordReader { + fn try_new(files: &[&Path]) -> Result { + let mut readers = Vec::with_capacity(files.len()); + + for file in files { + let reader = StreamReader::try_new(File::open(file).unwrap(), None)?; + readers.push(reader); + } + + Ok(Self { readers }) + } + + fn merged_iter(self) -> impl Iterator { + kmerge_by( + self.readers.into_iter().map(|reader| reader.flatten()), + |a: &RecordBatch, b: &RecordBatch| { + let a: &TimestampMillisecondArray = a + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + let b: &TimestampMillisecondArray = b + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + a.value(0) < b.value(0) + }, + ) + } + + fn merged_schema(&self) -> Schema { + Schema::try_merge( + self.readers + .iter() + .map(|stream| stream.schema().deref().clone()), + ) + .unwrap() + } +} + #[inline(always)] fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { serde_json::to_vec(any) From 607334290971c84c0ff4538c826b02cfacd49ba3 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 6 Feb 2023 11:51:41 +0530 Subject: [PATCH 06/22] Fixes --- server/Cargo.toml | 1 + server/src/event.rs | 2 +- server/src/handlers/event.rs | 2 +- server/src/handlers/logstream.rs | 2 +- server/src/storage.rs | 4 ---- server/src/validator.rs | 2 +- 6 files changed, 5 insertions(+), 8 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 60c2630fa..be7640f26 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -67,6 +67,7 @@ ulid = { version = "1.0", features = ["serde"] } ureq = { version = "2.5.0", features = ["json"] } dashmap = "5.4.0" hex = "0.4.3" +itertools = "0.10.5" [build-dependencies] static-files = "0.2.1" diff --git a/server/src/event.rs b/server/src/event.rs index 2cf5bc8a0..21e1c3a7e 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -227,7 +227,7 @@ fn commit_schema( .expect("map of schemas is serializable"); let storage = CONFIG.storage().get_object_store(); - futures::executor::block_on(storage.put_schema_map(&stream_name, &schema_map))?; + futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map))?; stream_metadata.set_unchecked(stream_name, schema_key, schema); } diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 74eb27e50..1a7e817a8 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -108,7 +108,7 @@ async fn push_logs( let event = event::Event { body, stream_name: stream_name.clone(), - schema_key: schema_key, + schema_key, }; event.process().await?; diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 16e9af9bf..d743fb48d 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -154,7 +154,7 @@ pub async fn put_alert( .schema_map(&stream_name) .map_err(|_| StreamError::StreamNotFound(stream_name.to_owned()))?; - if schemas.len() == 0 { + if schemas.is_empty() { return Err(StreamError::UninitializedLogstream); } diff --git a/server/src/storage.rs b/server/src/storage.rs index bbf085d0f..187978911 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -254,8 +254,6 @@ impl StorageDir { #[derive(Debug, thiserror::Error)] pub enum MoveDataError { - #[error("Unable to Open file after moving")] - Open, #[error("Unable to create recordbatch stream")] Arrow(#[from] ArrowError), #[error("Could not generate parquet file")] @@ -264,8 +262,6 @@ pub enum MoveDataError { ObjectStorage(#[from] ObjectStorageError), #[error("Could not generate parquet file")] Create, - #[error("Could not delete temp arrow file")] - Delete, } #[derive(Debug, thiserror::Error)] diff --git a/server/src/validator.rs b/server/src/validator.rs index f791b79d6..ce07dbac0 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -160,7 +160,7 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result Date: Mon, 6 Feb 2023 13:02:01 +0530 Subject: [PATCH 07/22] Add adapt schema --- server/src/storage/object_storage.rs | 80 +++++++++++++++++++--------- 1 file changed, 54 insertions(+), 26 deletions(-) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index e3bccfcb9..42ccc2345 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -27,7 +27,9 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion::{ arrow::{ - array::TimestampMillisecondArray, ipc::reader::StreamReader, record_batch::RecordBatch, + array::{new_null_array, ArrayRef, TimestampMillisecondArray}, + ipc::reader::StreamReader, + record_batch::RecordBatch, }, datasource::listing::ListingTable, execution::runtime_env::RuntimeEnv, @@ -251,13 +253,10 @@ pub trait ObjectStorage: Sync + 'static { parquet_table.upsert(&parquet_path); let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new( - parquet_file, - Arc::new(record_reader.merged_schema()), - Some(props), - )?; + let schema = Arc::new(record_reader.merged_schema()); + let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter() { + for ref record in record_reader.merged_iter(&schema) { writer.write(record)?; } @@ -354,6 +353,33 @@ fn get_parquet_path(file: &Path) -> PathBuf { parquet_path } +pub fn adapt_batch(table_schema: &Schema, batch: RecordBatch) -> RecordBatch { + if table_schema.eq(&batch.schema()) { + return batch; + } + + let batch_rows = batch.num_rows(); + let batch_schema = &*batch.schema(); + + let mut cols: Vec = Vec::with_capacity(batch.num_columns()); + let batch_cols = batch.columns().to_vec(); + + for table_field in table_schema.fields() { + if let Some((batch_idx, _name)) = batch_schema.column_with_name(table_field.name().as_str()) + { + cols.push(Arc::clone(&batch_cols[batch_idx])); + } else { + cols.push(new_null_array(table_field.data_type(), batch_rows)) + } + } + + let merged_schema = Arc::new(table_schema.clone()); + + let merged_batch = RecordBatch::try_new(merged_schema, cols).unwrap(); + + merged_batch +} + #[derive(Debug)] struct MergedRecordReader { readers: Vec>, @@ -371,25 +397,27 @@ impl MergedRecordReader { Ok(Self { readers }) } - fn merged_iter(self) -> impl Iterator { - kmerge_by( - self.readers.into_iter().map(|reader| reader.flatten()), - |a: &RecordBatch, b: &RecordBatch| { - let a: &TimestampMillisecondArray = a - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - let b: &TimestampMillisecondArray = b - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - a.value(0) < b.value(0) - }, - ) + fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { + let adapted_readers = self + .readers + .into_iter() + .map(move |reader| reader.flatten().map(|batch| adapt_batch(schema, batch))); + + kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { + let a: &TimestampMillisecondArray = a + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + let b: &TimestampMillisecondArray = b + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + a.value(0) < b.value(0) + }) } fn merged_schema(&self) -> Schema { From 746ec33dc1643a93d06932d7d311207f306f2918 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 6 Feb 2023 22:41:44 +0530 Subject: [PATCH 08/22] merged schema --- server/src/handlers/logstream.rs | 17 ++++------------- server/src/metadata.rs | 20 ++++++++++++++++++++ server/src/query.rs | 7 +++---- server/src/validator.rs | 10 ++++++---- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index d743fb48d..91caa0e65 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -20,6 +20,7 @@ use std::fs; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; +use arrow_schema::Schema; use chrono::Utc; use serde_json::Value; @@ -59,23 +60,13 @@ pub async fn delete(req: HttpRequest) -> Result { } pub async fn list(_: HttpRequest) -> impl Responder { - let body = CONFIG - .storage() - .get_object_store() - .list_streams() - .await - .unwrap(); - web::Json(body) + web::Json(STREAM_INFO.list_streams()) } pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - let schemas = STREAM_INFO - .schema_map(&stream_name) - .map_err(|_| StreamError::StreamNotFound(stream_name.to_owned()))?; - - Ok((web::Json(schemas), StatusCode::OK)) + let schema = STREAM_INFO.merged_schemas(&stream_name)?; + Ok((web::Json(schema), StatusCode::OK)) } pub async fn get_alert(req: HttpRequest) -> Result { diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 7259f48de..697abf8c1 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -74,6 +74,10 @@ impl STREAM_INFO { map.contains_key(stream_name) } + pub fn stream_initialized(&self, stream_name: &str) -> Result { + Ok(!self.schema_map(stream_name)?.is_empty()) + } + pub fn schema( &self, stream_name: &str, @@ -101,6 +105,22 @@ impl STREAM_INFO { Ok(schemas) } + pub fn merged_schema(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); + let schemas = map + .get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))? + .schema; + let schema = Schema::try_merge( + schemas.values() + .map(|schema| &**schema) + .cloned(), + ) + .expect("mergeable schemas")); + + OK(schema) + } + pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) diff --git a/server/src/query.rs b/server/src/query.rs index 89db27d92..86109edc0 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -49,7 +49,7 @@ fn get_value(value: &Value, key: Key) -> Result<&str, Key> { pub struct Query { pub query: String, pub stream_name: String, - pub schemas: HashMap>, + pub merged_schema: Arc, pub start: DateTime, pub end: DateTime, } @@ -88,9 +88,8 @@ impl Query { res } - pub fn get_schema(&self) -> Schema { - Schema::try_merge(self.schemas.values().map(|schema| schema.as_ref()).cloned()) - .expect("mergable shcemas") + pub fn get_schema(&self) -> Arc { + self.merged_schema } /// Execute query on object storage(and if necessary on cache as well) with given stream information diff --git a/server/src/validator.rs b/server/src/validator.rs index ce07dbac0..75a0dbc5a 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -16,6 +16,8 @@ * */ +use std::sync::Arc; + use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; @@ -158,18 +160,18 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result Date: Tue, 7 Feb 2023 11:51:49 +0530 Subject: [PATCH 09/22] Refactor --- server/src/event.rs | 53 ++------ server/src/handlers/event.rs | 2 +- server/src/handlers/logstream.rs | 24 ++-- server/src/metadata.rs | 14 +- server/src/query.rs | 26 ++-- server/src/query/table_provider.rs | 31 +++-- server/src/storage/localfs.rs | 2 +- server/src/storage/object_storage.rs | 54 ++------ server/src/storage/s3.rs | 2 +- server/src/utils.rs | 184 +-------------------------- server/src/utils/batch_adapter.rs | 43 +++++++ server/src/utils/header_parsing.rs | 81 ++++++++++++ server/src/utils/json.rs | 42 ++++++ server/src/utils/uid.rs | 25 ++++ server/src/utils/update.rs | 95 ++++++++++++++ 15 files changed, 365 insertions(+), 313 deletions(-) create mode 100644 server/src/utils/batch_adapter.rs create mode 100644 server/src/utils/header_parsing.rs create mode 100644 server/src/utils/json.rs create mode 100644 server/src/utils/uid.rs create mode 100644 server/src/utils/update.rs diff --git a/server/src/event.rs b/server/src/event.rs index 21e1c3a7e..547aaeff6 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -19,7 +19,7 @@ mod writer; * */ use arrow_schema::{DataType, Field, TimeUnit}; -use chrono::{DateTime, Utc}; +use chrono::Utc; use datafusion::arrow::array::{Array, TimestampMillisecondArray}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::ArrowError; @@ -40,7 +40,6 @@ use self::error::EventError; pub use self::writer::STREAM_WRITERS; const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; -const TIME_KEYS: &[&str] = &["time", "date", "datetime", "timestamp"]; #[derive(Clone)] pub struct Event { @@ -59,36 +58,20 @@ impl Event { return Err(EventError::SchemaMismatch); }; - if event - .schema() - .column_with_name(DEFAULT_TIMESTAMP_KEY) - .is_some() - { - let rows = event.num_rows(); - let timestamp_array = Arc::new(get_timestamp_array(rows)); - event = replace(schema, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); - } + let rows = event.num_rows(); + let timestamp_array = Arc::new(get_timestamp_array(rows)); + event = replace(schema, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); self.process_event(&event)?; } else { // if stream schema is none then it is first event, // process first event and store schema in obect store - // check for a possible datetime field - let time_field = get_datetime_field(&self.body); - - if time_field.is_none() { - let schema = add_default_timestamp_field(self.infer_schema()?)?; - let schema_ref = Arc::new(schema.clone()); - let event = self.get_record(schema_ref.clone())?; - let timestamp_array = Arc::new(get_timestamp_array(event.num_rows())); - let event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); - self.process_first_event(&event, schema)?; - } else { - let schema = self.infer_schema()?; - let schema_ref = Arc::new(schema.clone()); - let event = self.get_record(schema_ref)?; - self.process_first_event(&event, schema)?; - } + let schema = add_default_timestamp_field(self.infer_schema()?)?; + let schema_ref = Arc::new(schema.clone()); + let event = self.get_record(schema_ref.clone())?; + let timestamp_array = Arc::new(get_timestamp_array(event.num_rows())); + let event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); + self.process_first_event(&event, schema)?; }; metadata::STREAM_INFO.update_stats( @@ -240,7 +223,7 @@ fn replace( column: &str, arr: Arc, ) -> RecordBatch { - let index = schema.column_with_name(column).unwrap().0; + let (index, _) = schema.column_with_name(column).unwrap(); let mut arrays = batch.columns().to_vec(); arrays[index] = arr; @@ -252,20 +235,6 @@ fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { TimestampMillisecondArray::from_value(time.timestamp_millis(), size) } -fn get_datetime_field(json: &Value) -> Option<&str> { - let Value::Object(object) = json else { panic!() }; - for (key, value) in object { - if TIME_KEYS.contains(&key.as_str()) { - if let Value::String(maybe_datetime) = value { - if DateTime::parse_from_rfc3339(maybe_datetime).is_ok() { - return Some(key); - } - } - } - } - None -} - trait UncheckedOp: DerefMut> { fn get_unchecked(&self, stream_name: &str, schema_key: &str) -> Option> { self.get(stream_name) diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 1a7e817a8..8aab87adb 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -24,7 +24,7 @@ use crate::option::CONFIG; use crate::query::Query; use crate::response::QueryResponse; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; -use crate::utils::{flatten_json_body, merge}; +use crate::utils::json::{flatten_json_body, merge}; use self::error::{PostError, QueryError}; diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 91caa0e65..d092cb857 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -20,7 +20,6 @@ use std::fs; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; -use arrow_schema::Schema; use chrono::Utc; use serde_json::Value; @@ -65,7 +64,7 @@ pub async fn list(_: HttpRequest) -> impl Responder { pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let schema = STREAM_INFO.merged_schemas(&stream_name)?; + let schema = STREAM_INFO.merged_schema(&stream_name)?; Ok((web::Json(schema), StatusCode::OK)) } @@ -141,19 +140,13 @@ pub async fn put_alert( validator::alert(&alerts)?; - let schemas = STREAM_INFO - .schema_map(&stream_name) - .map_err(|_| StreamError::StreamNotFound(stream_name.to_owned()))?; - - if schemas.is_empty() { + if !STREAM_INFO.stream_initialized(&stream_name)? { return Err(StreamError::UninitializedLogstream); } + let schema = STREAM_INFO.merged_schema(&stream_name)?; for alert in &alerts.alerts { - if !schemas - .values() - .any(|schema| alert.rule.valid_for_schema(schema)) - { + if !alert.rule.valid_for_schema(&schema) { return Err(StreamError::InvalidAlert(alert.name.to_owned())); } } @@ -245,6 +238,7 @@ pub mod error { use http::StatusCode; use crate::{ + metadata::error::stream_info::MetadataError, storage::ObjectStorageError, validator::error::{AlertValidationError, StreamNameValidationError}, }; @@ -295,4 +289,12 @@ pub mod error { .body(self.to_string()) } } + + impl From for StreamError { + fn from(value: MetadataError) -> Self { + match value { + MetadataError::StreamMetaNotFound(s) => StreamError::StreamNotFound(s), + } + } + } } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 697abf8c1..ee4184215 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -107,18 +107,14 @@ impl STREAM_INFO { pub fn merged_schema(&self, stream_name: &str) -> Result { let map = self.read().expect(LOCK_EXPECT); - let schemas = map + let schemas = &map .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))? .schema; - let schema = Schema::try_merge( - schemas.values() - .map(|schema| &**schema) - .cloned(), - ) - .expect("mergeable schemas")); - - OK(schema) + let schema = Schema::try_merge(schemas.values().map(|schema| &**schema).cloned()) + .expect("mergeable schemas"); + + Ok(schema) } pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { diff --git a/server/src/query.rs b/server/src/query.rs index 86109edc0..b7e8e5e9d 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -25,7 +25,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; use datafusion::prelude::*; use serde_json::Value; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -72,24 +72,14 @@ impl Query { } pub fn get_prefixes(&self) -> Vec { - let datetime_prefixes = self._get_prefixes(); - let mut res = Vec::new(); - - for schema_key in self.schemas.keys() { - let prefix = format!("{}/{}", self.stream_name, schema_key); - - res.extend( - datetime_prefixes - .iter() - .map(|datetime_prefix| format!("{}/{}", prefix, datetime_prefix)), - ) - } - - res + self._get_prefixes() + .into_iter() + .map(|key| format!("{}/{}", self.stream_name, key)) + .collect() } - pub fn get_schema(&self) -> Arc { - self.merged_schema + pub fn get_schema(&self) -> &Schema { + &self.merged_schema } /// Execute query on object storage(and if necessary on cache as well) with given stream information @@ -129,7 +119,7 @@ impl Query { arrow_files, parquet_files, storage.query_table(self)?, - Arc::new(self.get_schema()), + Arc::new(self.get_schema().clone()), )); ctx.register_table( diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index e6aff729b..2c3ff995f 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -37,6 +37,8 @@ use std::fs::File; use std::path::PathBuf; use std::sync::Arc; +use crate::utils::batch_adapter::adapt_batch; + pub struct QueryTableProvider { arrow_files: Vec, parquet_files: Vec, @@ -77,7 +79,12 @@ impl QueryTableProvider { let mut mem_records: Vec> = Vec::new(); let mut parquet_files = self.parquet_files.clone(); for file in &self.arrow_files { - load_arrows(file, &mut mem_records, &mut parquet_files); + load_arrows( + file, + Some(&self.schema), + &mut mem_records, + &mut parquet_files, + ); } let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; @@ -168,20 +175,24 @@ fn local_parquet_table( fn load_arrows( file: &PathBuf, + schema: Option<&Schema>, mem_records: &mut Vec>, parquet_files: &mut Vec, ) { let Ok(arrow_file) = File::open(file) else { return; }; let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return; }; - let records = reader - .filter_map(|record| match record { - Ok(record) => Some(record), - Err(e) => { - log::warn!("warning from arrow stream {:?}", e); - None - } - }) - .collect(); + let records = reader.filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("warning from arrow stream {:?}", e); + None + } + }); + let records = match schema { + Some(schema) => records.map(|record| adapt_batch(schema, record)).collect(), + None => records.collect(), + }; + mem_records.push(records); let mut file = file.clone(); file.set_extension("parquet"); diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 9eca3ddfd..51d1be4d3 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -191,7 +191,7 @@ impl ObjectStorage for LocalFS { let config = ListingTableConfig::new_with_multi_paths(prefixes) .with_listing_options(listing_options) - .with_schema(Arc::new(query.get_schema())); + .with_schema(Arc::new(query.get_schema().clone())); Ok(Some(ListingTable::try_new(config)?)) } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 42ccc2345..2197fe938 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -20,16 +20,17 @@ use super::{ file_link::CacheState, LogStream, MoveDataError, ObjectStorageError, ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, CACHED_FILES, }; -use crate::{alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats}; +use crate::{ + alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats, + utils::batch_adapter::adapt_batch, +}; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{ arrow::{ - array::{new_null_array, ArrayRef, TimestampMillisecondArray}, - ipc::reader::StreamReader, - record_batch::RecordBatch, + array::TimestampMillisecondArray, ipc::reader::StreamReader, record_batch::RecordBatch, }, datasource::listing::ListingTable, execution::runtime_env::RuntimeEnv, @@ -46,6 +47,7 @@ use std::{ collections::HashMap, fs::{self, File}, path::{Path, PathBuf}, + process, sync::Arc, }; @@ -207,39 +209,31 @@ pub trait ObjectStorage: Sync + 'static { let mut stream_stats = HashMap::new(); for stream in &streams { - // get dir let dir = StorageDir::new(stream); // walk dir, find all .arrows files and convert to parquet - // Do not include file which is being written to let time = chrono::Utc::now().naive_utc(); let hot_filename = StorageDir::file_time_suffix(time); let mut arrow_files = dir.arrow_files(); - arrow_files.retain(|file| { !file .file_name() .expect("is a not none filename") .to_str() - .unwrap() + .expect("valid unicode") .ends_with(&hot_filename) }); - // hashmap time = vec[paths] + // hashmap let mut grouped_arrow_file: HashMap<&str, Vec<&Path>> = HashMap::new(); - for file in &arrow_files { let key = file .file_name() .expect("is a not none filename") .to_str() .unwrap(); - let (_, key) = key.split_once('.').unwrap(); - - let key = key.strip_suffix("data.arrows").unwrap(); - grouped_arrow_file.entry(key).or_default().push(file); } @@ -263,7 +257,10 @@ pub trait ObjectStorage: Sync + 'static { writer.close()?; for file in files { - let _ = fs::remove_file(file); + if fs::remove_file(file).is_err() { + log::error!("Failed to delete file. Unstable state"); + process::abort() + } } } @@ -353,33 +350,6 @@ fn get_parquet_path(file: &Path) -> PathBuf { parquet_path } -pub fn adapt_batch(table_schema: &Schema, batch: RecordBatch) -> RecordBatch { - if table_schema.eq(&batch.schema()) { - return batch; - } - - let batch_rows = batch.num_rows(); - let batch_schema = &*batch.schema(); - - let mut cols: Vec = Vec::with_capacity(batch.num_columns()); - let batch_cols = batch.columns().to_vec(); - - for table_field in table_schema.fields() { - if let Some((batch_idx, _name)) = batch_schema.column_with_name(table_field.name().as_str()) - { - cols.push(Arc::clone(&batch_cols[batch_idx])); - } else { - cols.push(new_null_array(table_field.data_type(), batch_rows)) - } - } - - let merged_schema = Arc::new(table_schema.clone()); - - let merged_batch = RecordBatch::try_new(merged_schema, cols).unwrap(); - - merged_batch -} - #[derive(Debug)] struct MergedRecordReader { readers: Vec>, diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 561d88109..ba1eba3b3 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -345,7 +345,7 @@ impl ObjectStorage for S3 { let config = ListingTableConfig::new_with_multi_paths(prefixes) .with_listing_options(listing_options) - .with_schema(Arc::new(query.get_schema())); + .with_schema(Arc::new(query.get_schema().clone())); Ok(Some(ListingTable::try_new(config)?)) } diff --git a/server/src/utils.rs b/server/src/utils.rs index 5eef313f7..4b4fb91e8 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -16,97 +16,15 @@ * */ +pub mod batch_adapter; +pub mod header_parsing; +pub mod json; +pub mod uid; +pub mod update; + use std::path::Path; use chrono::{DateTime, NaiveDate, Timelike, Utc}; -use serde_json::{json, Value}; - -pub fn flatten_json_body(body: &serde_json::Value) -> Result { - let mut flat_value: Value = json!({}); - flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap(); - Ok(flat_value) -} - -pub fn merge(value: &mut Value, fields: impl Iterator) { - if let Value::Object(m) = value { - for (k, v) in fields { - match m.get_mut(&k) { - Some(val) => { - *val = v; - } - None => { - m.insert(k, v); - } - } - } - } -} - -pub mod header_parsing { - const MAX_HEADERS_ALLOWED: usize = 10; - use actix_web::{HttpRequest, HttpResponse, ResponseError}; - - pub fn collect_labelled_headers( - req: &HttpRequest, - prefix: &str, - kv_separator: char, - ) -> Result { - // filter out headers which has right prefix label and convert them into str; - let headers = req.headers().iter().filter_map(|(key, value)| { - let key = key.as_str().strip_prefix(prefix)?; - Some((key, value)) - }); - - let mut labels: Vec = Vec::new(); - - for (key, value) in headers { - let value = value.to_str().map_err(|_| ParseHeaderError::InvalidValue)?; - if key.is_empty() { - return Err(ParseHeaderError::Emptykey); - } - if key.contains(kv_separator) { - return Err(ParseHeaderError::SeperatorInKey(kv_separator)); - } - if value.contains(kv_separator) { - return Err(ParseHeaderError::SeperatorInValue(kv_separator)); - } - - labels.push(format!("{key}={value}")); - } - - if labels.len() > MAX_HEADERS_ALLOWED { - return Err(ParseHeaderError::MaxHeadersLimitExceeded); - } - - Ok(labels.join(&kv_separator.to_string())) - } - - #[derive(Debug, thiserror::Error)] - pub enum ParseHeaderError { - #[error("Too many headers received. Limit is of 5 headers")] - MaxHeadersLimitExceeded, - #[error("A value passed in header can't be formatted to plain visible ASCII")] - InvalidValue, - #[error("Invalid Key was passed which terminated just after the end of prefix")] - Emptykey, - #[error("A key passed in header contains reserved char {0}")] - SeperatorInKey(char), - #[error("A value passed in header contains reserved char {0}")] - SeperatorInValue(char), - #[error("Stream name not found in header [x-p-stream]")] - MissingStreamName, - } - - impl ResponseError for ParseHeaderError { - fn status_code(&self) -> http::StatusCode { - http::StatusCode::BAD_REQUEST - } - - fn error_response(&self) -> HttpResponse { - HttpResponse::build(self.status_code()).body(self.to_string()) - } - } -} #[allow(dead_code)] pub fn hostname() -> Option { @@ -133,96 +51,6 @@ pub fn validate_path_is_writeable(path: &Path) -> anyhow::Result<()> { Ok(()) } -pub mod uid { - use ulid::Ulid; - - pub type Uid = Ulid; - - pub fn gen() -> Ulid { - Ulid::new() - } -} - -pub mod update { - use crate::banner::about::current; - use std::env; - use std::{path::Path, time::Duration}; - - use anyhow::anyhow; - use chrono::{DateTime, Utc}; - use ulid::Ulid; - - use crate::storage::StorageMetadata; - - static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST"; - - #[derive(Debug)] - pub struct LatestRelease { - pub version: semver::Version, - pub date: DateTime, - } - - fn is_docker() -> bool { - Path::new("/.dockerenv").exists() - } - - fn is_k8s() -> bool { - env::var(K8S_ENV_TO_CHECK).is_ok() - } - - fn platform() -> &'static str { - if is_k8s() { - "Kubernetes" - } else if is_docker() { - "Docker" - } else { - "Native" - } - } - - // User Agent for Download API call - // Format: Parseable/// (; ) - fn user_agent(uid: &Ulid) -> String { - let info = os_info::get(); - format!( - "Parseable/{}/{}/{} ({}; {})", - uid, - current().0, - current().1, - info.os_type(), - platform() - ) - } - - pub fn get_latest(meta: &StorageMetadata) -> Result { - let agent = ureq::builder() - .user_agent(user_agent(&meta.deployment_id).as_str()) - .timeout(Duration::from_secs(8)) - .build(); - - let json: serde_json::Value = agent - .get("https://download.parseable.io/latest-version") - .call()? - .into_json()?; - - let version = json["tag_name"] - .as_str() - .and_then(|ver| ver.strip_prefix('v')) - .and_then(|ver| semver::Version::parse(ver).ok()) - .ok_or_else(|| anyhow!("Failed parsing version"))?; - - let date = json["published_at"] - .as_str() - .ok_or_else(|| anyhow!("Failed parsing published date"))?; - - let date = chrono::DateTime::parse_from_rfc3339(date) - .expect("date-time from github is in rfc3339 format") - .into(); - - Ok(LatestRelease { version, date }) - } -} - /// Convert minutes to a slot range /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { diff --git a/server/src/utils/batch_adapter.rs b/server/src/utils/batch_adapter.rs new file mode 100644 index 000000000..7fdd2febe --- /dev/null +++ b/server/src/utils/batch_adapter.rs @@ -0,0 +1,43 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use arrow_schema::Schema; +use datafusion::arrow::array::new_null_array; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::record_batch::RecordBatch; + +use std::sync::Arc; + +pub fn adapt_batch(table_schema: &Schema, batch: RecordBatch) -> RecordBatch { + let batch_schema = &*batch.schema(); + + let mut cols: Vec = Vec::with_capacity(table_schema.fields().len()); + let batch_cols = batch.columns().to_vec(); + + for table_field in table_schema.fields() { + if let Some((batch_idx, _)) = batch_schema.column_with_name(table_field.name().as_str()) { + cols.push(Arc::clone(&batch_cols[batch_idx])); + } else { + cols.push(new_null_array(table_field.data_type(), batch.num_rows())) + } + } + + let merged_schema = Arc::new(table_schema.clone()); + + RecordBatch::try_new(merged_schema, cols).unwrap() +} diff --git a/server/src/utils/header_parsing.rs b/server/src/utils/header_parsing.rs new file mode 100644 index 000000000..0e56aa794 --- /dev/null +++ b/server/src/utils/header_parsing.rs @@ -0,0 +1,81 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +const MAX_HEADERS_ALLOWED: usize = 10; +use actix_web::{HttpRequest, HttpResponse, ResponseError}; + +pub fn collect_labelled_headers( + req: &HttpRequest, + prefix: &str, + kv_separator: char, +) -> Result { + // filter out headers which has right prefix label and convert them into str; + let headers = req.headers().iter().filter_map(|(key, value)| { + let key = key.as_str().strip_prefix(prefix)?; + Some((key, value)) + }); + + let mut labels: Vec = Vec::new(); + + for (key, value) in headers { + let value = value.to_str().map_err(|_| ParseHeaderError::InvalidValue)?; + if key.is_empty() { + return Err(ParseHeaderError::Emptykey); + } + if key.contains(kv_separator) { + return Err(ParseHeaderError::SeperatorInKey(kv_separator)); + } + if value.contains(kv_separator) { + return Err(ParseHeaderError::SeperatorInValue(kv_separator)); + } + + labels.push(format!("{}={}", key, value)); + } + + if labels.len() > MAX_HEADERS_ALLOWED { + return Err(ParseHeaderError::MaxHeadersLimitExceeded); + } + + Ok(labels.join(&kv_separator.to_string())) +} + +#[derive(Debug, thiserror::Error)] +pub enum ParseHeaderError { + #[error("Too many headers received. Limit is of 5 headers")] + MaxHeadersLimitExceeded, + #[error("A value passed in header can't be formatted to plain visible ASCII")] + InvalidValue, + #[error("Invalid Key was passed which terminated just after the end of prefix")] + Emptykey, + #[error("A key passed in header contains reserved char {0}")] + SeperatorInKey(char), + #[error("A value passed in header contains reserved char {0}")] + SeperatorInValue(char), + #[error("Stream name not found in header [x-p-stream]")] + MissingStreamName, +} + +impl ResponseError for ParseHeaderError { + fn status_code(&self) -> http::StatusCode { + http::StatusCode::BAD_REQUEST + } + + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()).body(self.to_string()) + } +} diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs new file mode 100644 index 000000000..f1d536d40 --- /dev/null +++ b/server/src/utils/json.rs @@ -0,0 +1,42 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use serde_json; +use serde_json::json; +use serde_json::Value; + +pub fn flatten_json_body(body: &serde_json::Value) -> Result { + let mut flat_value: Value = json!({}); + flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap(); + Ok(flat_value) +} + +pub fn merge(value: &mut Value, fields: impl Iterator) { + if let Value::Object(m) = value { + for (k, v) in fields { + match m.get_mut(&k) { + Some(val) => { + *val = v; + } + None => { + m.insert(k, v); + } + } + } + } +} diff --git a/server/src/utils/uid.rs b/server/src/utils/uid.rs new file mode 100644 index 000000000..8a895d3bf --- /dev/null +++ b/server/src/utils/uid.rs @@ -0,0 +1,25 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use ulid::Ulid; + +pub type Uid = Ulid; + +pub fn gen() -> Ulid { + Ulid::new() +} diff --git a/server/src/utils/update.rs b/server/src/utils/update.rs new file mode 100644 index 000000000..9a0c7be89 --- /dev/null +++ b/server/src/utils/update.rs @@ -0,0 +1,95 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::banner::about::current; +use std::env; +use std::{path::Path, time::Duration}; + +use anyhow::anyhow; +use chrono::{DateTime, Utc}; +use ulid::Ulid; + +use crate::storage::StorageMetadata; + +static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST"; + +#[derive(Debug)] +pub struct LatestRelease { + pub version: semver::Version, + pub date: DateTime, +} + +fn is_docker() -> bool { + Path::new("/.dockerenv").exists() +} + +fn is_k8s() -> bool { + env::var(K8S_ENV_TO_CHECK).is_ok() +} + +fn platform() -> &'static str { + if is_k8s() { + "Kubernetes" + } else if is_docker() { + "Docker" + } else { + "Native" + } +} + +// User Agent for Download API call +// Format: Parseable/// (; ) +fn user_agent(uid: &Ulid) -> String { + let info = os_info::get(); + format!( + "Parseable/{}/{}/{} ({}; {})", + uid, + current().0, + current().1, + info.os_type(), + platform() + ) +} + +pub fn get_latest(meta: &StorageMetadata) -> Result { + let agent = ureq::builder() + .user_agent(user_agent(&meta.deployment_id).as_str()) + .timeout(Duration::from_secs(8)) + .build(); + + let json: serde_json::Value = agent + .get("https://download.parseable.io/latest-version") + .call()? + .into_json()?; + + let version = json["tag_name"] + .as_str() + .and_then(|ver| ver.strip_prefix('v')) + .and_then(|ver| semver::Version::parse(ver).ok()) + .ok_or_else(|| anyhow!("Failed parsing version"))?; + + let date = json["published_at"] + .as_str() + .ok_or_else(|| anyhow!("Failed parsing published date"))?; + + let date = chrono::DateTime::parse_from_rfc3339(date) + .expect("date-time from github is in rfc3339 format") + .into(); + + Ok(LatestRelease { version, date }) +} From 43c6254950a59e5599fd0072e72e97a39a381498 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 7 Feb 2023 14:45:08 +0530 Subject: [PATCH 10/22] Fix --- server/src/utils/header_parsing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils/header_parsing.rs b/server/src/utils/header_parsing.rs index 0e56aa794..0708804bf 100644 --- a/server/src/utils/header_parsing.rs +++ b/server/src/utils/header_parsing.rs @@ -44,7 +44,7 @@ pub fn collect_labelled_headers( return Err(ParseHeaderError::SeperatorInValue(kv_separator)); } - labels.push(format!("{}={}", key, value)); + labels.push(format!("{key}={value}")); } if labels.len() > MAX_HEADERS_ALLOWED { From 3b281264f6dc446b6526ba0c61d44a84f798d506 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 7 Feb 2023 17:40:20 +0530 Subject: [PATCH 11/22] Fix list output --- server/src/handlers/logstream.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index d092cb857..57345c09e 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -27,7 +27,7 @@ use crate::alerts::Alerts; use crate::event; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::StorageDir; +use crate::storage::{LogStream, StorageDir}; use crate::{metadata, validator}; use self::error::StreamError; @@ -59,7 +59,13 @@ pub async fn delete(req: HttpRequest) -> Result { } pub async fn list(_: HttpRequest) -> impl Responder { - web::Json(STREAM_INFO.list_streams()) + let res: Vec = STREAM_INFO + .list_streams() + .into_iter() + .map(|stream| LogStream { name: stream }) + .collect(); + + web::Json(res) } pub async fn schema(req: HttpRequest) -> Result { From 20eaede0400ad159d7eb18a6fd7ef42a507a28d5 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 7 Feb 2023 17:53:55 +0530 Subject: [PATCH 12/22] Bump chrono version --- server/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index be7640f26..fe274c5a3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -17,7 +17,7 @@ aws-sdk-s3 = "0.19" aws-smithy-async = { version = "0.49.0", features = ["rt-tokio"] } base64 = "0.20.0" bytes = "1" -chrono = "0.4.19" +chrono = "0.4.23" chrono-humanize = "0.2.2" clap = { version = "4.0.32", default-features = false, features = [ "std", From b14774cbba762e514331c07633ce25695c579d60 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 7 Feb 2023 21:51:06 +0530 Subject: [PATCH 13/22] Change to stream version v2 --- server/src/storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/storage.rs b/server/src/storage.rs index 187978911..a6cee3c3e 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -107,7 +107,7 @@ impl Permisssion { impl Default for ObjectStoreFormat { fn default() -> Self { Self { - version: "v1".to_string(), + version: "v2".to_string(), objectstore_format: "v1".to_string(), created_at: Local::now().to_rfc3339(), owner: Owner::new("".to_string(), "".to_string()), From 82127437b8d37ad0bab2ae09c2ab831b9c7f6d3e Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 7 Feb 2023 21:51:33 +0530 Subject: [PATCH 14/22] Fix map serialize --- server/src/event.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 547aaeff6..1dad29fe6 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -199,8 +199,11 @@ fn commit_schema( // drop the lock drop(stream_metadata); // Nothing to do - return Ok(()); + Ok(()) } else { + // set to map + stream_metadata.set_unchecked(stream_name, schema_key, schema); + // serialize map let schema_map = serde_json::to_string( &stream_metadata .get(stream_name) @@ -208,13 +211,16 @@ fn commit_schema( .schema, ) .expect("map of schemas is serializable"); - + // try to put to storage let storage = CONFIG.storage().get_object_store(); - futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map))?; - stream_metadata.set_unchecked(stream_name, schema_key, schema); + let res = futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map)); + // revert if err + if res.is_err() { + stream_metadata.remove_unchecked(stream_name, schema_key) + } + // return result + res.map_err(|err| err.into()) } - - Ok(()) } fn replace( @@ -252,6 +258,13 @@ trait UncheckedOp: DerefMut>> UncheckedOp for T {} From d3970b989468f61579fb03d9a134851d19a52ff5 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 8 Feb 2023 13:33:05 +0530 Subject: [PATCH 15/22] add Migration --- server/src/main.rs | 11 ++-- server/src/migration.rs | 53 +++++++++++++++++++ server/src/migration/schema_migration.rs | 32 +++++++++++ .../migration/stream_metadata_migration.rs | 14 +++++ server/src/storage.rs | 5 ++ server/src/storage/object_storage.rs | 8 +++ 6 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 server/src/migration.rs create mode 100644 server/src/migration/schema_migration.rs create mode 100644 server/src/migration/stream_metadata_migration.rs diff --git a/server/src/main.rs b/server/src/main.rs index 4332fc256..e59291331 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -26,24 +26,25 @@ use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; use log::warn; use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys}; -use std::env; use thread_priority::{ThreadBuilder, ThreadPriority}; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::TryRecvError; include!(concat!(env!("OUT_DIR"), "/generated.rs")); +use std::env; use std::fs::File; use std::io::BufReader; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::thread::{self, JoinHandle}; use std::time::Duration; -use tokio::sync::oneshot; -use tokio::sync::oneshot::error::TryRecvError; mod alerts; mod banner; mod event; mod handlers; mod metadata; +mod migration; mod option; mod query; mod response; @@ -54,7 +55,6 @@ mod validator; use option::CONFIG; -// Global configurations const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; const API_BASE_PATH: &str = "/api"; const API_VERSION: &str = "v1"; @@ -69,9 +69,12 @@ async fn main() -> anyhow::Result<()> { let metadata = storage::resolve_parseable_metadata().await?; banner::print(&CONFIG, metadata); + migration::run_migration(&CONFIG).await?; + if let Err(e) = metadata::STREAM_INFO.load(&*storage).await { warn!("could not populate local metadata. {:?}", e); } + // track all parquet files already in the data directory storage::CACHED_FILES.track_parquet(); diff --git a/server/src/migration.rs b/server/src/migration.rs new file mode 100644 index 000000000..263e845ab --- /dev/null +++ b/server/src/migration.rs @@ -0,0 +1,53 @@ +mod schema_migration; +mod stream_metadata_migration; + +use bytes::Bytes; +use relative_path::RelativePathBuf; +use serde::Serialize; + +use crate::{option::Config, storage::ObjectStorage}; + +pub async fn run_migration(config: &Config) -> anyhow::Result<()> { + let storage = config.storage().get_object_store(); + let streams = storage.list_streams().await?; + + for stream in streams { + migration_stream(&stream.name, &*storage).await? + } + + Ok(()) +} + +async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { + let path = RelativePathBuf::from_iter([stream, ".stream.json"]); + let stream_metadata = storage.get_object(&path).await?; + let stream_metadata: serde_json::Value = + serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); + + let maybe_v1 = stream_metadata + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + + if let Some("v1") = maybe_v1 { + let new_stream_metadata = stream_metadata_migration::v1_v2(stream_metadata); + storage + .put_object(&path, to_bytes(&new_stream_metadata)) + .await?; + + let schema_path = RelativePathBuf::from_iter([stream, ".schema"]); + let schema = storage.get_object(&schema_path).await?; + let schema = serde_json::from_slice(&schema).ok(); + let map = schema_migration::v1_v2(schema)?; + storage.put_object(&schema_path, to_bytes(&map)).await?; + } + + Ok(()) +} + +#[inline(always)] +fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { + serde_json::to_vec(any) + .map(|any| any.into()) + .expect("serialize cannot fail") +} diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs new file mode 100644 index 000000000..70c4778e9 --- /dev/null +++ b/server/src/migration/schema_migration.rs @@ -0,0 +1,32 @@ +use std::collections::HashMap; + +use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use itertools::Itertools; +use md5::Digest; + +pub(super) fn v1_v2(schema: Option) -> anyhow::Result> { + let Some(schema) = schema else { return Ok(HashMap::new()) }; + let schema = Schema::try_merge(vec![ + Schema::new(vec![Field::new( + "p_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )]), + schema, + ])?; + + let list_of_fields = schema + .fields() + .iter() + // skip p_timestamp + .skip(1) + .map(|f| f.name()) + .sorted(); + let mut hasher = md5::Md5::new(); + list_of_fields.for_each(|field| hasher.update(field.as_bytes())); + let key = hex::encode(hasher.finalize()); + let mut map = HashMap::new(); + map.insert(key, schema); + + Ok(map) +} diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs new file mode 100644 index 000000000..b21a203de --- /dev/null +++ b/server/src/migration/stream_metadata_migration.rs @@ -0,0 +1,14 @@ +use serde_json::{json, Value}; + +pub fn v1_v2(mut stream_metadata: Value) -> Value { + let default_stats = json!({ + "events": 0, + "ingestion": 0, + "storage": 0 + }); + let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); + stream_metadata_map.entry("stats").or_insert(default_stats); + stream_metadata_map.insert("version".to_owned(), Value::String("v2".into())); + stream_metadata_map.insert("objectstore-format".to_owned(), Value::String("v2".into())); + stream_metadata +} diff --git a/server/src/storage.rs b/server/src/storage.rs index a6cee3c3e..e74982fa6 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -19,6 +19,7 @@ use crate::metadata::STREAM_INFO; use crate::option::CONFIG; +use crate::stats::Stats; use crate::storage::file_link::{FileLink, FileTable}; use crate::utils; @@ -66,13 +67,16 @@ const ACCESS_ALL: &str = "all"; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectStoreFormat { + /// Version of schema registry pub version: String, + /// Version for change in the way how parquet are generated/stored. #[serde(rename = "objectstore-format")] pub objectstore_format: String, #[serde(rename = "created-at")] pub created_at: String, pub owner: Owner, pub permissions: Vec, + pub stats: Stats, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -112,6 +116,7 @@ impl Default for ObjectStoreFormat { created_at: Local::now().to_rfc3339(), owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], + stats: Stats::default(), } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 2197fe938..a527cc9da 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -159,6 +159,14 @@ pub trait ObjectStorage: Sync + 'static { } } + async fn get_stream_metadata( + &self, + stream_name: &str, + ) -> Result { + let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; + Ok(serde_json::from_slice(&stream_metadata).expect("parseable config is valid json")) + } + async fn get_stats(&self, stream_name: &str) -> Result { let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; let stream_metadata: Value = From 64f9f9850945a383f836ca6d2bbafd85d0410366 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 8 Feb 2023 15:36:31 +0530 Subject: [PATCH 16/22] Replace with matches --- server/src/migration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/migration.rs b/server/src/migration.rs index 263e845ab..ec3bdc055 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -29,7 +29,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: .and_then(|meta| meta.get("version")) .and_then(|version| version.as_str()); - if let Some("v1") = maybe_v1 { + if matches!(maybe_v1, Some("v1")) { let new_stream_metadata = stream_metadata_migration::v1_v2(stream_metadata); storage .put_object(&path, to_bytes(&new_stream_metadata)) From 84d619c05a4d51cab6bdbe37c930fc1e387ca80c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 8 Feb 2023 16:28:35 +0530 Subject: [PATCH 17/22] Move mod --- server/src/event.rs | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 1dad29fe6..7b39eeb1a 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -1,33 +1,34 @@ +/* +* Parseable Server (C) 2022 - 2023 Parseable, Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* +*/ + mod writer; -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - */ use arrow_schema::{DataType, Field, TimeUnit}; use chrono::Utc; use datafusion::arrow::array::{Array, TimestampMillisecondArray}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::ArrowError; - use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions}; use datafusion::arrow::record_batch::RecordBatch; use md5::Digest; use serde_json::Value; + use std::collections::HashMap; use std::ops::DerefMut; use std::sync::Arc; From c74e57a6bbfa33cc65ae54985b120129f6a0297a Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 8 Feb 2023 16:32:17 +0530 Subject: [PATCH 18/22] Add banner --- server/src/migration.rs | 19 +++++++++++++++++++ server/src/migration/schema_migration.rs | 19 +++++++++++++++++++ .../migration/stream_metadata_migration.rs | 19 +++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/server/src/migration.rs b/server/src/migration.rs index ec3bdc055..67f38670a 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -1,3 +1,22 @@ +/* +* Parseable Server (C) 2022 - 2023 Parseable, Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* +*/ + mod schema_migration; mod stream_metadata_migration; diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index 70c4778e9..2aa6937a6 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -1,3 +1,22 @@ +/* +* Parseable Server (C) 2022 - 2023 Parseable, Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* +*/ + use std::collections::HashMap; use arrow_schema::{DataType, Field, Schema, TimeUnit}; diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index b21a203de..386056482 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -1,3 +1,22 @@ +/* +* Parseable Server (C) 2022 - 2023 Parseable, Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* +*/ + use serde_json::{json, Value}; pub fn v1_v2(mut stream_metadata: Value) -> Value { From 0482f68a9987aec01e0a99007f89ac55fac5b781 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 9 Feb 2023 13:53:48 +0530 Subject: [PATCH 19/22] Query Fix --- server/src/handlers/event.rs | 2 +- server/src/query.rs | 41 +++++---- server/src/query/table_provider.rs | 127 ++++++++++++++++----------- server/src/storage.rs | 55 +++++++++++- server/src/storage/localfs.rs | 15 ++-- server/src/storage/object_storage.rs | 59 ++++--------- server/src/storage/s3.rs | 15 ++-- 7 files changed, 184 insertions(+), 130 deletions(-) diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 8aab87adb..6daf8dbd1 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -39,7 +39,7 @@ pub async fn query(_req: HttpRequest, json: web::Json) -> Result::into) diff --git a/server/src/query.rs b/server/src/query.rs index b7e8e5e9d..0f8a1d1b7 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -24,10 +24,11 @@ use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; use datafusion::prelude::*; +use itertools::Itertools; use serde_json::Value; +use std::collections::hash_map::RandomState; use std::collections::HashSet; -use std::path::Path; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use crate::option::CONFIG; @@ -86,29 +87,26 @@ impl Query { /// TODO: find a way to query all selected parquet files together in a single context. pub async fn execute( &self, - storage: &(impl ObjectStorage + ?Sized), + storage: Arc, ) -> Result, ExecuteError> { let dir = StorageDir::new(&self.stream_name); - // take a look at local dir and figure out what local cache we could use for this query - let arrow_files: Vec = dir - .arrow_files() + let staging_arrows = dir + .arrow_files_grouped_by_time() .into_iter() - .filter(|path| path_intersects_query(path, self.start, self.end)) - .collect(); + .filter(|(path, _)| path_intersects_query(path, self.start, self.end)) + .sorted_by(|(a, _), (b, _)| Ord::cmp(a, b)) + .collect_vec(); - let possible_parquet_files = arrow_files.iter().cloned().map(|mut path| { - path.set_extension("parquet"); - path - }); + let staging_parquet_set: HashSet<&PathBuf, RandomState> = + HashSet::from_iter(staging_arrows.iter().map(|(p, _)| p)); - let parquet_files = dir + let other_staging_parquet = dir .parquet_files() .into_iter() - .filter(|path| path_intersects_query(path, self.start, self.end)); - - let parquet_files: HashSet = possible_parquet_files.chain(parquet_files).collect(); - let parquet_files = Vec::from_iter(parquet_files.into_iter()); + .filter(|path| path_intersects_query(path, self.start, self.end)) + .filter(|path| !staging_parquet_set.contains(path)) + .collect_vec(); let ctx = SessionContext::with_config_rt( SessionConfig::default(), @@ -116,9 +114,10 @@ impl Query { ); let table = Arc::new(QueryTableProvider::new( - arrow_files, - parquet_files, - storage.query_table(self)?, + staging_arrows, + other_staging_parquet, + self.get_prefixes(), + storage, Arc::new(self.get_schema().clone()), )); @@ -148,7 +147,7 @@ fn time_from_path(path: &Path) -> DateTime { // split by . and skip first part because that is schema key. // Next three in order will be date, hour and minute - let mut components = prefix.splitn(4, '.').skip(1); + let mut components = prefix.splitn(3, '.'); let date = components.next().expect("date=xxxx-xx-xx"); let hour = components.next().expect("hour=xx"); diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 2c3ff995f..b09bb405b 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -32,38 +32,47 @@ use datafusion::logical_expr::TableType; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; +use itertools::Itertools; use std::any::Any; use std::fs::File; use std::path::PathBuf; use std::sync::Arc; -use crate::utils::batch_adapter::adapt_batch; +use crate::storage::ObjectStorage; pub struct QueryTableProvider { - arrow_files: Vec, - parquet_files: Vec, - storage: Option, + // parquet - ( arrow files ) + staging_arrows: Vec<(PathBuf, Vec)>, + other_staging_parquet: Vec, + storage_prefixes: Vec, + storage: Arc, schema: Arc, } impl QueryTableProvider { pub fn new( - arrow_files: Vec, - parquet_files: Vec, - storage: Option, + staging_arrows: Vec<(PathBuf, Vec)>, + other_staging_parquet: Vec, + storage_prefixes: Vec, + storage: Arc, schema: Arc, ) -> Self { // By the time this query executes the arrow files could be converted to parquet files // we want to preserve these files as well in case - let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); - for file in &parquet_files { + + for file in staging_arrows + .iter() + .map(|(p, _)| p) + .chain(other_staging_parquet.iter()) + { parquet_cached.upsert(file) } Self { - arrow_files, - parquet_files, + staging_arrows, + other_staging_parquet, + storage_prefixes, storage, schema, } @@ -77,15 +86,14 @@ impl QueryTableProvider { limit: Option, ) -> Result, DataFusionError> { let mut mem_records: Vec> = Vec::new(); - let mut parquet_files = self.parquet_files.clone(); - for file in &self.arrow_files { - load_arrows( - file, - Some(&self.schema), - &mut mem_records, - &mut parquet_files, - ); + let mut parquet_files = Vec::new(); + + for (staging_parquet, arrow_files) in &self.staging_arrows { + if !load_arrows(arrow_files, &self.schema, &mut mem_records) { + parquet_files.push(staging_parquet.clone()) + } } + parquet_files.extend(self.other_staging_parquet.clone()); let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; let memexec = memtable.scan(ctx, projection, filters, limit).await?; @@ -93,13 +101,22 @@ impl QueryTableProvider { let cache_exec = if parquet_files.is_empty() { memexec } else { - let listtable = local_parquet_table(&parquet_files, &self.schema)?; - let listexec = listtable.scan(ctx, projection, filters, limit).await?; - Arc::new(UnionExec::new(vec![memexec, listexec])) + match local_parquet_table(&parquet_files, &self.schema) { + Some(table) => { + let listexec = table.scan(ctx, projection, filters, limit).await?; + Arc::new(UnionExec::new(vec![memexec, listexec])) + } + None => memexec, + } }; let mut exec = vec![cache_exec]; - if let Some(ref storage_listing) = self.storage { + + let table = self + .storage + .query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?; + + if let Some(ref storage_listing) = table { exec.push( storage_listing .scan(ctx, projection, filters, limit) @@ -114,7 +131,12 @@ impl QueryTableProvider { impl Drop for QueryTableProvider { fn drop(&mut self) { let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); - for file in &self.parquet_files { + for file in self + .staging_arrows + .iter() + .map(|(p, _)| p) + .chain(self.other_staging_parquet.iter()) + { parquet_cached.remove(file) } } @@ -146,10 +168,7 @@ impl TableProvider for QueryTableProvider { } } -fn local_parquet_table( - parquet_files: &[PathBuf], - schema: &SchemaRef, -) -> Result { +fn local_parquet_table(parquet_files: &[PathBuf], schema: &SchemaRef) -> Option { let listing_options = ListingOptions { file_extension: ".parquet".to_owned(), format: Arc::new(ParquetFormat::default().with_enable_pruning(true)), @@ -160,41 +179,45 @@ fn local_parquet_table( let paths = parquet_files .iter() - .map(|path| { + .flat_map(|path| { ListingTableUrl::parse(path.to_str().expect("path should is valid unicode")) - .expect("path is valid for filesystem listing") }) - .collect(); + .collect_vec(); + + if paths.is_empty() { + return None; + } let config = ListingTableConfig::new_with_multi_paths(paths) .with_listing_options(listing_options) .with_schema(Arc::clone(schema)); - ListingTable::try_new(config) + match ListingTable::try_new(config) { + Ok(table) => Some(table), + Err(err) => { + log::error!("Local parquet query failed due to err: {err}"); + None + } + } } fn load_arrows( - file: &PathBuf, - schema: Option<&Schema>, + files: &[PathBuf], + schema: &Schema, mem_records: &mut Vec>, - parquet_files: &mut Vec, -) { - let Ok(arrow_file) = File::open(file) else { return; }; - let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return; }; - let records = reader.filter_map(|record| match record { - Ok(record) => Some(record), - Err(e) => { - log::warn!("warning from arrow stream {:?}", e); - None - } - }); - let records = match schema { - Some(schema) => records.map(|record| adapt_batch(schema, record)).collect(), - None => records.collect(), - }; +) -> bool { + let mut stream_readers = Vec::with_capacity(files.len()); + + for file in files { + let Ok(arrow_file) = File::open(file) else { return false; }; + let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return false; }; + stream_readers.push(reader); + } + let reader = crate::storage::MergedRecordReader { + readers: stream_readers, + }; + let records = reader.merged_iter(schema).collect(); mem_records.push(records); - let mut file = file.clone(); - file.set_extension("parquet"); - parquet_files.retain(|p| p != &file); + true } diff --git a/server/src/storage.rs b/server/src/storage.rs index e74982fa6..1fb8666be 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -30,8 +30,9 @@ use datafusion::parquet::errors::ParquetError; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fs::create_dir_all; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; mod file_link; @@ -41,6 +42,7 @@ mod s3; mod store_metadata; pub use localfs::{FSConfig, LocalFS}; +pub use object_storage::MergedRecordReader; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::{S3Config, S3}; pub use store_metadata::StorageMetadata; @@ -246,6 +248,48 @@ impl StorageDir { paths } + pub fn arrow_files_grouped_by_time(&self) -> HashMap> { + // hashmap + let mut grouped_arrow_file: HashMap> = HashMap::new(); + let arrow_files = self.arrow_files(); + for arrow_file_path in arrow_files { + let key = Self::arrow_path_to_parquet(&arrow_file_path); + grouped_arrow_file + .entry(key) + .or_default() + .push(arrow_file_path); + } + + grouped_arrow_file + } + + pub fn arrow_files_grouped_exclude_time( + &self, + exclude: NaiveDateTime, + ) -> HashMap> { + let hot_filename = StorageDir::file_time_suffix(exclude); + // hashmap but exclude where hotfilename matches + let mut grouped_arrow_file: HashMap> = HashMap::new(); + let mut arrow_files = self.arrow_files(); + arrow_files.retain(|path| { + !path + .file_name() + .unwrap() + .to_str() + .unwrap() + .ends_with(&hot_filename) + }); + for arrow_file_path in arrow_files { + let key = Self::arrow_path_to_parquet(&arrow_file_path); + grouped_arrow_file + .entry(key) + .or_default() + .push(arrow_file_path); + } + + grouped_arrow_file + } + pub fn parquet_files(&self) -> Vec { let Ok(dir) = self.data_path .read_dir() else { return vec![] }; @@ -255,6 +299,15 @@ impl StorageDir { .filter(|file| file.extension().map_or(false, |ext| ext.eq("parquet"))) .collect() } + + fn arrow_path_to_parquet(path: &Path) -> PathBuf { + let filename = path.file_name().unwrap().to_str().unwrap(); + let (_, filename) = filename.split_once('.').unwrap(); + let mut parquet_path = path.to_owned(); + parquet_path.set_file_name(filename); + parquet_path.set_extension("parquet"); + parquet_path + } } #[derive(Debug, thiserror::Error)] diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 51d1be4d3..299770486 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -21,6 +21,7 @@ use std::{ sync::Arc, }; +use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{ @@ -28,6 +29,7 @@ use datafusion::{ file_format::parquet::ParquetFormat, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, }, + error::DataFusionError, execution::runtime_env::{RuntimeConfig, RuntimeEnv}, }; use fs_extra::file::{move_file, CopyOptions}; @@ -36,7 +38,7 @@ use relative_path::RelativePath; use tokio::fs; use tokio_stream::wrappers::ReadDirStream; -use crate::{option::validation, query::Query, utils::validate_path_is_writeable}; +use crate::{option::validation, utils::validate_path_is_writeable}; use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; @@ -166,9 +168,12 @@ impl ObjectStorage for LocalFS { Ok(()) } - fn query_table(&self, query: &Query) -> Result, ObjectStorageError> { - let prefixes: Vec = query - .get_prefixes() + fn query_table( + &self, + prefixes: Vec, + schema: Arc, + ) -> Result, DataFusionError> { + let prefixes: Vec = prefixes .into_iter() .filter_map(|prefix| { let path = self.root.join(prefix); @@ -191,7 +196,7 @@ impl ObjectStorage for LocalFS { let config = ListingTableConfig::new_with_multi_paths(prefixes) .with_listing_options(listing_options) - .with_schema(Arc::new(query.get_schema().clone())); + .with_schema(schema); Ok(Some(ListingTable::try_new(config)?)) } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index a527cc9da..fe6678137 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -21,7 +21,7 @@ use super::{ Permisssion, StorageDir, StorageMetadata, CACHED_FILES, }; use crate::{ - alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats, + alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, stats::Stats, utils::batch_adapter::adapt_batch, }; @@ -33,6 +33,7 @@ use datafusion::{ array::TimestampMillisecondArray, ipc::reader::StreamReader, record_batch::RecordBatch, }, datasource::listing::ListingTable, + error::DataFusionError, execution::runtime_env::RuntimeEnv, parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, }; @@ -75,7 +76,11 @@ pub trait ObjectStorage: Sync + 'static { async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; async fn list_streams(&self) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; - fn query_table(&self, query: &Query) -> Result, ObjectStorageError>; + fn query_table( + &self, + prefixes: Vec, + schema: Arc, + ) -> Result, DataFusionError>; async fn put_schema_map( &self, @@ -221,33 +226,10 @@ pub trait ObjectStorage: Sync + 'static { // walk dir, find all .arrows files and convert to parquet // Do not include file which is being written to let time = chrono::Utc::now().naive_utc(); - let hot_filename = StorageDir::file_time_suffix(time); + let staging_files = dir.arrow_files_grouped_exclude_time(time); - let mut arrow_files = dir.arrow_files(); - arrow_files.retain(|file| { - !file - .file_name() - .expect("is a not none filename") - .to_str() - .expect("valid unicode") - .ends_with(&hot_filename) - }); - - // hashmap - let mut grouped_arrow_file: HashMap<&str, Vec<&Path>> = HashMap::new(); - for file in &arrow_files { - let key = file - .file_name() - .expect("is a not none filename") - .to_str() - .unwrap(); - let (_, key) = key.split_once('.').unwrap(); - grouped_arrow_file.entry(key).or_default().push(file); - } - - for files in grouped_arrow_file.values() { - let parquet_path = get_parquet_path(files[0]); - let record_reader = MergedRecordReader::try_new(files).unwrap(); + for (parquet_path, files) in staging_files { + let record_reader = MergedRecordReader::try_new(&files).unwrap(); let mut parquet_table = CACHED_FILES.lock().unwrap(); let parquet_file = @@ -347,24 +329,13 @@ pub trait ObjectStorage: Sync + 'static { } } -fn get_parquet_path(file: &Path) -> PathBuf { - let filename = file.file_name().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); - - let mut parquet_path = file.to_owned(); - parquet_path.set_file_name(filename); - parquet_path.set_extension("parquet"); - - parquet_path -} - #[derive(Debug)] -struct MergedRecordReader { - readers: Vec>, +pub struct MergedRecordReader { + pub readers: Vec>, } impl MergedRecordReader { - fn try_new(files: &[&Path]) -> Result { + pub fn try_new(files: &[PathBuf]) -> Result { let mut readers = Vec::with_capacity(files.len()); for file in files { @@ -375,7 +346,7 @@ impl MergedRecordReader { Ok(Self { readers }) } - fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { + pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { let adapted_readers = self .readers .into_iter() @@ -398,7 +369,7 @@ impl MergedRecordReader { }) } - fn merged_schema(&self) -> Schema { + pub fn merged_schema(&self) -> Schema { Schema::try_merge( self.readers .iter() diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index ba1eba3b3..8775b6ef8 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -16,6 +16,7 @@ * */ +use arrow_schema::Schema; use async_trait::async_trait; use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind}; use aws_sdk_s3::model::{CommonPrefix, Delete, ObjectIdentifier}; @@ -32,6 +33,7 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::datasource::object_store::ObjectStoreRegistry; +use datafusion::error::DataFusionError; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use futures::StreamExt; use http::Uri; @@ -44,8 +46,6 @@ use std::iter::Iterator; use std::path::Path; use std::sync::Arc; -use crate::query::Query; - use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; use super::ObjectStorageProvider; @@ -319,10 +319,13 @@ impl ObjectStorage for S3 { Ok(()) } - fn query_table(&self, query: &Query) -> Result, ObjectStorageError> { + fn query_table( + &self, + prefixes: Vec, + schema: Arc, + ) -> Result, DataFusionError> { // Get all prefix paths and convert them into futures which yeilds ListingTableUrl - let prefixes: Vec = query - .get_prefixes() + let prefixes: Vec = prefixes .into_iter() .map(|prefix| { let path = format!("s3://{}/{}", &self.bucket, prefix); @@ -345,7 +348,7 @@ impl ObjectStorage for S3 { let config = ListingTableConfig::new_with_multi_paths(prefixes) .with_listing_options(listing_options) - .with_schema(Arc::new(query.get_schema().clone())); + .with_schema(schema); Ok(Some(ListingTable::try_new(config)?)) } From 6ff51460a58fddacf6a2fed287f4ab2f81dbd3b7 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 9 Feb 2023 14:01:58 +0530 Subject: [PATCH 20/22] Fix test --- server/src/query.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 0f8a1d1b7..abb25d1fb 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -145,7 +145,6 @@ fn time_from_path(path: &Path) -> DateTime { .to_str() .expect("filename is valid"); - // split by . and skip first part because that is schema key. // Next three in order will be date, hour and minute let mut components = prefix.splitn(3, '.'); @@ -200,9 +199,7 @@ mod tests { #[test] fn test_time_from_parquet_path() { - let path = PathBuf::from( - "schema_key_rlength.date=2022-01-01.hour=00.minute=00.hostname.data.parquet", - ); + let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); let time = time_from_path(path.as_path()); assert_eq!(time.timestamp(), 1640995200); } From 79b54246560832a4d328c6fa6c3e437621e1be1c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 9 Feb 2023 16:19:53 +0530 Subject: [PATCH 21/22] Fix import --- server/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index fe274c5a3..ab175a7c9 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -65,9 +65,9 @@ actix-web-static-files = "4.0" static-files = "0.2.1" ulid = { version = "1.0", features = ["serde"] } ureq = { version = "2.5.0", features = ["json"] } -dashmap = "5.4.0" hex = "0.4.3" itertools = "0.10.5" +xxhash-rust = { version = "0.8.6", features = ["xxh3"] } [build-dependencies] static-files = "0.2.1" @@ -80,7 +80,6 @@ zip = { version = "0.6.3", default_features = false, features = ["deflate"] } [dev-dependencies] maplit = "1.0.2" rstest = "0.15.0" -serial_test = { version = "0.9.0", default-features = false } [package.metadata.parseable_ui] assets-url = "https://github.com/parseablehq/console/releases/download/v0.0.8/build.zip" From bed5a09ed6c373441d20ff0ca9cde65dac03e924 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 9 Feb 2023 16:20:24 +0530 Subject: [PATCH 22/22] Use new hasher --- server/src/event.rs | 7 +++---- server/src/migration/schema_migration.rs | 9 +++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 7b39eeb1a..0941e6a0f 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -26,7 +26,6 @@ use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::ArrowError; use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions}; use datafusion::arrow::record_batch::RecordBatch; -use md5::Digest; use serde_json::Value; use std::collections::HashMap; @@ -147,12 +146,12 @@ fn add_default_timestamp_field(schema: Schema) -> Result { pub fn get_schema_key(body: &Value) -> String { let mut list_of_fields: Vec<_> = body.as_object().unwrap().keys().collect(); list_of_fields.sort(); - let mut hasher = md5::Md5::new(); + let mut hasher = xxhash_rust::xxh3::Xxh3::new(); for field in list_of_fields { hasher.update(field.as_bytes()) } - - hex::encode(hasher.finalize()) + let hash = hasher.digest(); + format!("{hash:x}") } fn fields_mismatch(schema: &Schema, body: &Value) -> bool { diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index 2aa6937a6..18a301ec3 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use itertools::Itertools; -use md5::Digest; pub(super) fn v1_v2(schema: Option) -> anyhow::Result> { let Some(schema) = schema else { return Ok(HashMap::new()) }; @@ -41,11 +40,13 @@ pub(super) fn v1_v2(schema: Option) -> anyhow::Result