From 9767144f8403bf4c1f3db54635f0b985871d361e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 9 Feb 2025 20:36:32 +0530 Subject: [PATCH] refactor: associate conversion with STAGING, not storage --- src/handlers/http/modal/mod.rs | 3 +- src/staging/mod.rs | 4 ++ src/staging/streams.rs | 79 ++++++++++++++++++++++++++++-- src/storage/object_storage.rs | 87 ---------------------------------- src/sync.rs | 2 +- 5 files changed, 83 insertions(+), 92 deletions(-) diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index bf62a1885..6cc1ec0c9 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -49,6 +49,7 @@ use super::API_VERSION; use crate::handlers::http::health_check; use crate::oidc; use crate::option::CONFIG; +use crate::staging::STAGING; pub type OpenIdClient = Arc>; @@ -140,7 +141,7 @@ pub trait ParseableServer { // Perform S3 sync and wait for completion info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await { + if let Err(e) = STAGING.prepare_parquet(true) { warn!("Failed to convert arrow files to parquet. {:?}", e); } else { info!("Successfully converted arrow files to parquet."); diff --git a/src/staging/mod.rs b/src/staging/mod.rs index 72d30640d..f7b81b9a9 100644 --- a/src/staging/mod.rs +++ b/src/staging/mod.rs @@ -20,6 +20,8 @@ use once_cell::sync::Lazy; pub use streams::{Stream, Streams}; +use crate::metadata::error::stream_info::MetadataError; + mod reader; mod streams; mod writer; @@ -34,6 +36,8 @@ pub enum StagingError { ObjectStorage(#[from] std::io::Error), #[error("Could not generate parquet file")] Create, + #[error("Metadata Error: {0}")] + Metadata(#[from] MetadataError), } /// Staging is made up of multiple streams, each stream's context is housed in a single `Stream` object. diff --git a/src/staging/streams.rs b/src/staging/streams.rs index e950ed6dc..365f117a5 100644 --- a/src/staging/streams.rs +++ b/src/staging/streams.rs @@ -19,7 +19,7 @@ use std::{ collections::HashMap, - fs::{remove_file, File, OpenOptions}, + fs::{self, remove_file, File, OpenOptions}, path::{Path, PathBuf}, process, sync::{Arc, Mutex, RwLock}, @@ -39,15 +39,17 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; -use tracing::error; +use relative_path::RelativePathBuf; +use tracing::{error, info, warn}; use crate::{ cli::Options, event::DEFAULT_TIMESTAMP_KEY, handlers::http::modal::ingest_server::INGESTOR_META, + metadata::{LOCK_EXPECT, STREAM_INFO}, metrics, option::{Mode, CONFIG}, - storage::{StreamType, OBJECT_STORE_DATA_GRANULARITY}, + storage::{object_storage::to_bytes, StreamType, OBJECT_STORE_DATA_GRANULARITY}, utils::minute_to_slot, }; @@ -272,6 +274,62 @@ impl<'a> Stream<'a> { parquet_path } + /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` + fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { + info!( + "Starting arrow_conversion job for stream- {}", + self.stream_name + ); + + let time_partition = STREAM_INFO.get_time_partition(&self.stream_name)?; + let custom_partition = STREAM_INFO.get_custom_partition(&self.stream_name)?; + + // read arrow files on disk + // convert them to parquet + let schema = self + .convert_disk_files_to_parquet( + time_partition.as_ref(), + custom_partition.as_ref(), + shutdown_signal, + ) + .inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?; + + // check if there is already a schema file in staging pertaining to this stream + // if yes, then merge them and save + + if let Some(schema) = schema { + let static_schema_flag = STREAM_INFO.get_static_schema_flag(&self.stream_name)?; + if !static_schema_flag { + // schema is dynamic, read from staging and merge if present + + // need to add something before .schema to make the file have an extension of type `schema` + let path = RelativePathBuf::from_iter([format!("{}.schema", self.stream_name)]) + .to_path(&self.data_path); + + let staging_schemas = self.get_schemas_if_present(); + if let Some(mut staging_schemas) = staging_schemas { + warn!( + "Found {} schemas in staging for stream- {}", + staging_schemas.len(), + self.stream_name + ); + staging_schemas.push(schema); + let merged_schema = Schema::try_merge(staging_schemas)?; + + warn!("writing merged schema to path- {path:?}"); + // save the merged schema on staging disk + // the path should be stream/.ingestor.id.schema + fs::write(path, to_bytes(&merged_schema))?; + } else { + info!("writing single schema to path- {path:?}"); + fs::write(path, to_bytes(&schema))?; + } + } + } + + Ok(()) + } + pub fn recordbatches_cloned(&self, schema: &Arc) -> Vec { self.writer.lock().unwrap().mem.recordbatch_cloned(schema) } @@ -492,6 +550,21 @@ impl Streams { staging.flush() } } + + /// Convert arrow files into parquet, preparing it for upload + pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { + if !Path::new(&CONFIG.staging_dir()).exists() { + return Ok(()); + } + + for stream in self.read().expect(LOCK_EXPECT).values() { + stream + .prepare_parquet(shutdown_signal) + .inspect_err(|err| error!("Failed to run conversion task {err:?}"))?; + } + + Ok(()) + } } #[cfg(test)] diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 8994f06a4..d290786d8 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -46,8 +46,6 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Local, Utc}; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; @@ -748,91 +746,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } - - async fn conversion(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> { - if !Path::new(&CONFIG.staging_dir()).exists() { - return Ok(()); - } - - let mut conversion_tasks = FuturesUnordered::new(); - for stream in STREAM_INFO.list_streams() { - conversion_tasks.push(conversion_for_stream(stream, shutdown_signal)); - } - - while let Some(res) = conversion_tasks.next().await { - if let Err(err) = res { - error!("Failed to run conversion task {err:?}"); - return Err(err); - } - } - - Ok(()) - } -} - -async fn conversion_for_stream( - stream: String, - shutdown_signal: bool, -) -> Result<(), ObjectStorageError> { - info!("Starting arrow_conversion job for stream- {stream}"); - - let time_partition = STREAM_INFO - .get_time_partition(&stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let custom_partition = STREAM_INFO - .get_custom_partition(&stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let stage = STAGING.get_or_create_stream(&stream); - - // read arrow files on disk - // convert them to parquet - let schema = stage - .convert_disk_files_to_parquet( - time_partition.as_ref(), - custom_partition.as_ref(), - shutdown_signal, - ) - .map_err(|err| { - warn!("Error while converting arrow to parquet- {err:?}"); - ObjectStorageError::UnhandledError(Box::new(err)) - })?; - - // check if there is already a schema file in staging pertaining to this stream - // if yes, then merge them and save - - if let Some(schema) = schema { - let static_schema_flag = STREAM_INFO - .get_static_schema_flag(&stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - if !static_schema_flag { - // schema is dynamic, read from staging and merge if present - - // need to add something before .schema to make the file have an extension of type `schema` - let path = - RelativePathBuf::from_iter([format!("{stream}.schema")]).to_path(&stage.data_path); - - let staging_schemas = stage.get_schemas_if_present(); - if let Some(mut staging_schemas) = staging_schemas { - warn!( - "Found {} schemas in staging for stream- {stream}", - staging_schemas.len() - ); - staging_schemas.push(schema); - let merged_schema = Schema::try_merge(staging_schemas) - .map_err(|e| ObjectStorageError::Custom(e.to_string()))?; - - warn!("writing merged schema to path- {path:?}"); - // save the merged schema on staging disk - // the path should be stream/.ingestor.id.schema - fs::write(path, to_bytes(&merged_schema))?; - } else { - info!("writing single schema to path- {path:?}"); - fs::write(path, to_bytes(&schema))?; - } - } - } - - Ok(()) } pub async fn commit_schema_to_storage( diff --git a/src/sync.rs b/src/sync.rs index 1eceb21bc..d03242f87 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -160,7 +160,7 @@ pub async fn arrow_conversion() -> ( if let Err(e) = monitor_task_duration( "arrow_conversion", Duration::from_secs(30), - || async { CONFIG.storage().get_object_store().conversion(false).await }, + || async { STAGING.prepare_parquet(false) }, ).await { warn!("failed to convert local arrow data to parquet. {e:?}");