Skip to content
Merged
3 changes: 3 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ mod streams;
/// File extension for arrow files in staging
const ARROW_FILE_EXTENSION: &str = "arrows";

/// File extension for incomplete arrow files
const PART_FILE_EXTENSION: &str = "part";

/// Name of a Stream
/// NOTE: this used to be a struct, flattened out for simplicity
pub type LogStream = String;
Expand Down
20 changes: 13 additions & 7 deletions src/parseable/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ fn find_limit_and_type(
#[cfg(test)]
mod tests {
use std::{
fs::File,
io::{self, Cursor, Read},
path::Path,
sync::Arc,
Expand All @@ -336,9 +335,17 @@ mod tests {
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
};
use arrow_schema::{DataType, Field, Schema};
use chrono::Utc;
use temp_dir::TempDir;

use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader};
use crate::{
parseable::staging::{
reader::{MergedReverseRecordReader, OffsetReader},
writer::DiskWriter,
},
utils::time::TimeRange,
OBJECT_STORE_DATA_GRANULARITY,
};

use super::get_reverse_reader;

Expand Down Expand Up @@ -482,15 +489,14 @@ mod tests {
schema: &Arc<Schema>,
batches: &[RecordBatch],
) -> io::Result<()> {
let file = File::create(path)?;
let range = TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY);
let mut writer =
StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter");
DiskWriter::try_new(path, schema, range).expect("Failed to create StreamWriter");

for batch in batches {
writer.write(batch).expect("Failed to write batch");
}

writer.finish().expect("Failed to finalize writer");
Ok(())
}

Expand Down Expand Up @@ -524,7 +530,7 @@ mod tests {
#[test]
fn test_merged_reverse_record_reader() -> io::Result<()> {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join("test.arrow");
let file_path = dir.path().join("test.data.arrows");

// Create a schema
let schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -627,7 +633,7 @@ mod tests {
#[test]
fn test_get_reverse_reader_single_message() -> io::Result<()> {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join("test_single.arrow");
let file_path = dir.path().join("test_single.data.arrows");

// Create a schema
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
Expand Down
71 changes: 68 additions & 3 deletions src/parseable/staging/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,87 @@

use std::{
collections::{HashMap, HashSet},
fs::File,
fs::{File, OpenOptions},
io::BufWriter,
path::PathBuf,
sync::Arc,
};

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::Schema;
use arrow_select::concat::concat_batches;
use chrono::Utc;
use itertools::Itertools;
use tracing::{error, warn};

use crate::utils::arrow::adapt_batch;
use crate::{
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
utils::{arrow::adapt_batch, time::TimeRange},
};

use super::StagingError;

#[derive(Default)]
pub struct Writer {
pub mem: MemWriter<16384>,
pub disk: HashMap<String, StreamWriter<File>>,
pub disk: HashMap<String, DiskWriter>,
}

pub struct DiskWriter {
inner: StreamWriter<BufWriter<File>>,
path: PathBuf,
range: TimeRange,
}

impl DiskWriter {
/// Try to create a file to stream arrows into
pub fn try_new(
path: impl Into<PathBuf>,
schema: &Schema,
range: TimeRange,
) -> Result<Self, StagingError> {
let mut path = path.into();
path.set_extension(PART_FILE_EXTENSION);
let file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&path)?;
let inner = StreamWriter::try_new_buffered(file, schema)?;

Ok(Self { inner, path, range })
}

pub fn is_current(&self) -> bool {
self.range.contains(Utc::now())
}

/// Write a single recordbatch into file
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
self.inner.write(rb).map_err(StagingError::Arrow)
}
}

impl Drop for DiskWriter {
/// Write the continuation bytes and mark the file as done, rename to `.data.arrows`
fn drop(&mut self) {
if let Err(err) = self.inner.finish() {
error!("Couldn't finish arrow file {:?}, error = {err}", self.path);
return;
}

let mut arrow_path = self.path.to_owned();
arrow_path.set_extension(ARROW_FILE_EXTENSION);

if arrow_path.exists() {
warn!("File {arrow_path:?} exists and will be overwritten");
}

if let Err(err) = std::fs::rename(&self.path, &arrow_path) {
error!("Couldn't rename file {:?}, error = {err}", self.path);
}
}
}

/// Structure to keep recordbatches in memory.
Expand Down
82 changes: 35 additions & 47 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use std::{
};

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike};
use chrono::{NaiveDateTime, Timelike, Utc};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use parquet::{
Expand All @@ -55,14 +54,14 @@ use crate::{
metrics,
option::Mode,
storage::{object_storage::to_bytes, retention::Retention, StreamType},
utils::time::Minute,
utils::time::{Minute, TimeRange},
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
};

use super::{
staging::{
reader::{MergedRecordReader, MergedReverseRecordReader},
writer::Writer,
writer::{DiskWriter, Writer},
StagingError,
},
LogStream, ARROW_FILE_EXTENSION,
Expand Down Expand Up @@ -123,29 +122,26 @@ impl Stream {
) -> Result<(), StagingError> {
let mut guard = self.writer.lock().unwrap();
if self.options.mode != Mode::Query || stream_type == StreamType::Internal {
match guard.disk.get_mut(schema_key) {
let filename =
self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values);
match guard.disk.get_mut(&filename) {
Some(writer) => {
writer.write(record)?;
}
None => {
// entry is not present thus we create it
let file_path = self.path_by_current_time(
schema_key,
parsed_timestamp,
custom_partition_values,
);
std::fs::create_dir_all(&self.data_path)?;

let file = OpenOptions::new()
.create(true)
.append(true)
.open(&file_path)?;

let mut writer = StreamWriter::try_new(file, &record.schema())
let range = TimeRange::granularity_range(
parsed_timestamp.and_local_timezone(Utc).unwrap(),
OBJECT_STORE_DATA_GRANULARITY,
);
let file_path = self.data_path.join(&filename);
let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)
.expect("File and RecordBatch both are checked");

writer.write(record)?;
guard.disk.insert(schema_key.to_owned(), writer);
guard.disk.insert(filename, writer);
}
};
}
Expand All @@ -155,17 +151,17 @@ impl Stream {
Ok(())
}

pub fn path_by_current_time(
pub fn filename_by_partition(
&self,
stream_hash: &str,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> PathBuf {
) -> String {
let mut hostname = hostname::get().unwrap().into_string().unwrap();
if let Some(id) = &self.ingestor_id {
hostname.push_str(id);
}
let filename = format!(
format!(
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
Expand All @@ -175,8 +171,7 @@ impl Stream {
.sorted_by_key(|v| v.0)
.map(|(key, value)| format!("{key}={value}."))
.join("")
);
self.data_path.join(filename)
)
}

pub fn arrow_files(&self) -> Vec<PathBuf> {
Expand Down Expand Up @@ -366,19 +361,12 @@ impl Stream {
self.writer.lock().unwrap().mem.clear();
}

pub fn flush(&self) {
let mut disk_writers = {
let mut writer = self.writer.lock().unwrap();
// Flush memory
writer.mem.clear();
// Take schema -> disk writer mapping
std::mem::take(&mut writer.disk)
};

// Flush disk
for writer in disk_writers.values_mut() {
_ = writer.finish();
}
pub fn flush(&self, forced: bool) {
let mut writer = self.writer.lock().unwrap();
// Flush memory
writer.mem.clear();
// Drop schema -> disk writer mapping, triggers flush to disk
writer.disk.retain(|_, w| !forced && w.is_current());
}

fn parquet_writer_props(
Expand Down Expand Up @@ -733,7 +721,7 @@ impl Stream {

/// First flushes arrows onto disk and then converts the arrow into parquet
pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> {
self.flush();
self.flush(shutdown_signal);

self.prepare_parquet(shutdown_signal)
}
Expand Down Expand Up @@ -944,18 +932,18 @@ mod tests {
None,
);

let expected_path = staging.data_path.join(format!(
let expected = format!(
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
hostname::get().unwrap().into_string().unwrap()
));
);

let generated_path =
staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values);
let generated =
staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values);

assert_eq!(generated_path, expected_path);
assert_eq!(generated, expected);
}

#[test]
Expand All @@ -978,18 +966,18 @@ mod tests {
None,
);

let expected_path = staging.data_path.join(format!(
let expected = format!(
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
hostname::get().unwrap().into_string().unwrap()
));
);

let generated_path =
staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values);
let generated =
staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values);

assert_eq!(generated_path, expected_path);
assert_eq!(generated, expected);
}

#[test]
Expand Down Expand Up @@ -1045,7 +1033,7 @@ mod tests {
StreamType::UserDefined,
)
.unwrap();
staging.flush();
staging.flush(true);
}

#[test]
Expand Down
Loading
Loading