Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pub struct RollingWindow {
// should always be "now"
pub eval_end: String,
// x minutes (5m)
pub eval_frequency: u32,
pub eval_frequency: u64,
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
Expand Down Expand Up @@ -641,7 +641,7 @@ impl AlertConfig {
columns
}

pub fn get_eval_frequency(&self) -> u32 {
pub fn get_eval_frequency(&self) -> u64 {
match &self.eval_type {
EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency,
}
Expand Down
2 changes: 0 additions & 2 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
.unwrap()
.into_iter()
.filter(|logstream| {
warn!("logstream-\n{logstream:?}");

Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
== crate::rbac::Response::Authorized
})
Expand Down
16 changes: 16 additions & 0 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ impl ParseableServer for IngestServer {
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;
let (
mut remote_conversion_handler,
mut remote_conversion_outbox,
mut remote_conversion_inbox,
) = sync::arrow_conversion().await;

tokio::spawn(airplane::server());

Expand All @@ -219,12 +224,16 @@ impl ParseableServer for IngestServer {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
remote_conversion_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
Expand All @@ -238,6 +247,13 @@ impl ParseableServer for IngestServer {
error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
},
_ = &mut remote_conversion_outbox => {
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
}

}
Expand Down
14 changes: 13 additions & 1 deletion src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,19 @@ pub trait ParseableServer {

// Perform S3 sync and wait for completion
info!("Starting data sync to S3...");
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {

if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await {
warn!("Failed to convert arrow files to parquet. {:?}", e);
} else {
info!("Successfully converted arrow files to parquet.");
}

if let Err(e) = CONFIG
.storage()
.get_object_store()
.upload_files_from_staging()
.await
{
warn!("Failed to sync local data with object store. {:?}", e);
} else {
info!("Successfully synced all data to S3.");
Expand Down
17 changes: 16 additions & 1 deletion src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ impl ParseableServer for Server {
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;

let (
mut remote_conversion_handler,
mut remote_conversion_outbox,
mut remote_conversion_inbox,
) = sync::arrow_conversion().await;
if CONFIG.options.send_analytics {
analytics::init_analytics_scheduler()?;
}
Expand All @@ -152,12 +156,16 @@ impl ParseableServer for Server {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
remote_conversion_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
Expand All @@ -171,6 +179,13 @@ impl ParseableServer for Server {
error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
},
_ = &mut remote_conversion_outbox => {
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
}

};
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ pub use handlers::http::modal::{
use once_cell::sync::Lazy;
use reqwest::{Client, ClientBuilder};

pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;
pub const STORAGE_CONVERSION_INTERVAL: u32 = 60;
pub const STORAGE_UPLOAD_INTERVAL: u32 = 30;

// A single HTTP client for all outgoing HTTP requests from the parseable server
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Expand Down
61 changes: 55 additions & 6 deletions src/staging/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::{
collections::HashMap,
fs::{remove_file, OpenOptions},
fs::{remove_file, File, OpenOptions},
path::{Path, PathBuf},
process,
sync::{Arc, Mutex, RwLock},
Expand Down Expand Up @@ -165,6 +165,11 @@ impl<'a> Stream<'a> {
paths
}

/// Groups arrow files which are to be included in one parquet
///
/// Excludes the arrow file being written for the current minute (data is still being written to that one)
///
/// Only includes ones starting from the previous minute
pub fn arrow_files_grouped_exclude_time(
&self,
exclude: NaiveDateTime,
Expand All @@ -173,6 +178,8 @@ impl<'a> Stream<'a> {
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
let mut arrow_files = self.arrow_files();

// if the shutdown signal is false i.e. normal condition
// don't keep the ones for the current minute
if !shutdown_signal {
arrow_files.retain(|path| {
!path
Expand Down Expand Up @@ -215,6 +222,45 @@ impl<'a> Stream<'a> {
.collect()
}

pub fn schema_files(&self) -> Vec<PathBuf> {
let Ok(dir) = self.data_path.read_dir() else {
return vec![];
};

dir.flatten()
.map(|file| file.path())
.filter(|file| file.extension().is_some_and(|ext| ext.eq("schema")))
.collect()
}

pub fn get_schemas_if_present(&self) -> Option<Vec<Schema>> {
let Ok(dir) = self.data_path.read_dir() else {
return None;
};

let mut schemas: Vec<Schema> = Vec::new();

for file in dir.flatten() {
if let Some(ext) = file.path().extension() {
if ext.eq("schema") {
let file = File::open(file.path()).expect("Schema File should exist");

let schema = match serde_json::from_reader(file) {
Ok(schema) => schema,
Err(_) => continue,
};
schemas.push(schema);
}
}
}

if !schemas.is_empty() {
Some(schemas)
} else {
None
}
}

fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf {
let filename = path.file_stem().unwrap().to_str().unwrap();
let (_, filename) = filename.split_once('.').unwrap();
Expand Down Expand Up @@ -249,6 +295,9 @@ impl<'a> Stream<'a> {
}
}

/// This function reads arrow files, groups their schemas
///
/// converts them into parquet files and returns a merged schema
pub fn convert_disk_files_to_parquet(
&self,
time_partition: Option<&String>,
Expand All @@ -272,12 +321,12 @@ impl<'a> Stream<'a> {
}

// warn!("staging files-\n{staging_files:?}\n");
for (parquet_path, files) in staging_files {
for (parquet_path, arrow_files) in staging_files {
metrics::STAGING_FILES
.with_label_values(&[&self.stream_name])
.set(files.len() as i64);
.set(arrow_files.len() as i64);

for file in &files {
for file in &arrow_files {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();

Expand All @@ -286,7 +335,7 @@ impl<'a> Stream<'a> {
.add(file_size as i64);
}

let record_reader = MergedReverseRecordReader::try_new(&files);
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
if record_reader.readers.is_empty() {
continue;
}
Expand Down Expand Up @@ -319,7 +368,7 @@ impl<'a> Stream<'a> {
);
remove_file(parquet_path).unwrap();
} else {
for file in files {
for file in arrow_files {
// warn!("file-\n{file:?}\n");
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();
Expand Down
Loading
Loading