Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,8 @@ impl Alerts {
outbox_tx,
)?;

self.update_task(alert.id, handle, outbox_rx, inbox_tx).await;
self.update_task(alert.id, handle, outbox_rx, inbox_tx)
.await;

map.insert(alert.id, alert);
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl ParseableServer for IngestServer {
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

result
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use super::API_VERSION;
use crate::handlers::http::health_check;
use crate::oidc;
use crate::option::CONFIG;
use crate::staging::STAGING;

pub type OpenIdClient = Arc<openid::Client<Discovered, oidc::Claims>>;

Expand Down Expand Up @@ -140,7 +141,7 @@ pub trait ParseableServer {
// Perform S3 sync and wait for completion
info!("Starting data sync to S3...");

if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await {
if let Err(e) = STAGING.prepare_parquet(true) {
warn!("Failed to convert arrow files to parquet. {:?}", e);
} else {
info!("Successfully converted arrow files to parquet.");
Expand Down
4 changes: 4 additions & 0 deletions src/staging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use once_cell::sync::Lazy;
pub use streams::{Stream, Streams};

use crate::metadata::error::stream_info::MetadataError;

mod reader;
mod streams;
mod writer;
Expand All @@ -34,6 +36,8 @@ pub enum StagingError {
ObjectStorage(#[from] std::io::Error),
#[error("Could not generate parquet file")]
Create,
#[error("Metadata Error: {0}")]
Metadata(#[from] MetadataError),
}

/// Staging is made up of multiple streams, each stream's context is housed in a single `Stream` object.
Expand Down
79 changes: 76 additions & 3 deletions src/staging/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::{
collections::HashMap,
fs::{remove_file, File, OpenOptions},
fs::{self, remove_file, File, OpenOptions},
path::{Path, PathBuf},
process,
sync::{Arc, Mutex, RwLock},
Expand All @@ -42,15 +42,17 @@ use parquet::{
schema::types::ColumnPath,
};
use rand::distributions::DistString;
use tracing::{error, trace};
use relative_path::RelativePathBuf;
use tracing::{error, info, trace, warn};

use crate::{
cli::Options,
event::DEFAULT_TIMESTAMP_KEY,
handlers::http::modal::ingest_server::INGESTOR_META,
metadata::{LOCK_EXPECT, STREAM_INFO},
metrics,
option::{Mode, CONFIG},
storage::{StreamType, OBJECT_STORE_DATA_GRANULARITY},
storage::{object_storage::to_bytes, StreamType, OBJECT_STORE_DATA_GRANULARITY},
utils::minute_to_slot,
};

Expand Down Expand Up @@ -278,6 +280,62 @@ impl<'a> Stream<'a> {
parquet_path
}

/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
info!(
"Starting arrow_conversion job for stream- {}",
self.stream_name
);

let time_partition = STREAM_INFO.get_time_partition(&self.stream_name)?;
let custom_partition = STREAM_INFO.get_custom_partition(&self.stream_name)?;

// read arrow files on disk
// convert them to parquet
let schema = self
.convert_disk_files_to_parquet(
time_partition.as_ref(),
custom_partition.as_ref(),
shutdown_signal,
)
.inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?;

// check if there is already a schema file in staging pertaining to this stream
// if yes, then merge them and save

if let Some(schema) = schema {
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&self.stream_name)?;
if !static_schema_flag {
// schema is dynamic, read from staging and merge if present

// need to add something before .schema to make the file have an extension of type `schema`
let path = RelativePathBuf::from_iter([format!("{}.schema", self.stream_name)])
.to_path(&self.data_path);

let staging_schemas = self.get_schemas_if_present();
if let Some(mut staging_schemas) = staging_schemas {
warn!(
"Found {} schemas in staging for stream- {}",
staging_schemas.len(),
self.stream_name
);
staging_schemas.push(schema);
let merged_schema = Schema::try_merge(staging_schemas)?;

warn!("writing merged schema to path- {path:?}");
// save the merged schema on staging disk
// the path should be stream/.ingestor.id.schema
fs::write(path, to_bytes(&merged_schema))?;
} else {
info!("writing single schema to path- {path:?}");
fs::write(path, to_bytes(&schema))?;
}
}
}

Ok(())
}

pub fn recordbatches_cloned(&self, schema: &Arc<Schema>) -> Vec<RecordBatch> {
self.writer.lock().unwrap().mem.recordbatch_cloned(schema)
}
Expand Down Expand Up @@ -506,6 +564,21 @@ impl Streams {
staging.flush()
}
}

/// Convert arrow files into parquet, preparing it for upload
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
if !Path::new(&CONFIG.staging_dir()).exists() {
return Ok(());
}

for stream in self.read().expect(LOCK_EXPECT).values() {
stream
.prepare_parquet(shutdown_signal)
.inspect_err(|err| error!("Failed to run conversion task {err:?}"))?;
}

Ok(())
}
}

#[cfg(test)]
Expand Down
87 changes: 0 additions & 87 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Local, Utc};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use relative_path::RelativePath;
use relative_path::RelativePathBuf;
Expand Down Expand Up @@ -748,91 +746,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {

Ok(())
}

async fn conversion(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> {
if !Path::new(&CONFIG.staging_dir()).exists() {
return Ok(());
}

let mut conversion_tasks = FuturesUnordered::new();
for stream in STREAM_INFO.list_streams() {
conversion_tasks.push(conversion_for_stream(stream, shutdown_signal));
}

while let Some(res) = conversion_tasks.next().await {
if let Err(err) = res {
error!("Failed to run conversion task {err:?}");
return Err(err);
}
}

Ok(())
}
}

async fn conversion_for_stream(
stream: String,
shutdown_signal: bool,
) -> Result<(), ObjectStorageError> {
info!("Starting arrow_conversion job for stream- {stream}");

let time_partition = STREAM_INFO
.get_time_partition(&stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let custom_partition = STREAM_INFO
.get_custom_partition(&stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let stage = STAGING.get_or_create_stream(&stream);

// read arrow files on disk
// convert them to parquet
let schema = stage
.convert_disk_files_to_parquet(
time_partition.as_ref(),
custom_partition.as_ref(),
shutdown_signal,
)
.map_err(|err| {
warn!("Error while converting arrow to parquet- {err:?}");
ObjectStorageError::UnhandledError(Box::new(err))
})?;

// check if there is already a schema file in staging pertaining to this stream
// if yes, then merge them and save

if let Some(schema) = schema {
let static_schema_flag = STREAM_INFO
.get_static_schema_flag(&stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
if !static_schema_flag {
// schema is dynamic, read from staging and merge if present

// need to add something before .schema to make the file have an extension of type `schema`
let path =
RelativePathBuf::from_iter([format!("{stream}.schema")]).to_path(&stage.data_path);

let staging_schemas = stage.get_schemas_if_present();
if let Some(mut staging_schemas) = staging_schemas {
warn!(
"Found {} schemas in staging for stream- {stream}",
staging_schemas.len()
);
staging_schemas.push(schema);
let merged_schema = Schema::try_merge(staging_schemas)
.map_err(|e| ObjectStorageError::Custom(e.to_string()))?;

warn!("writing merged schema to path- {path:?}");
// save the merged schema on staging disk
// the path should be stream/.ingestor.id.schema
fs::write(path, to_bytes(&merged_schema))?;
} else {
info!("writing single schema to path- {path:?}");
fs::write(path, to_bytes(&schema))?;
}
}
}

Ok(())
}

pub async fn commit_schema_to_storage(
Expand Down
2 changes: 1 addition & 1 deletion src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub fn arrow_conversion() -> (
if let Err(e) = monitor_task_duration(
"arrow_conversion",
Duration::from_secs(30),
|| async { CONFIG.storage().get_object_store().conversion(false).await },
|| async { STAGING.prepare_parquet(false) },
).await
{
warn!("failed to convert local arrow data to parquet. {e:?}");
Expand Down
Loading