Skip to content

Commit 1ee5dd0

Browse files
author
Devdutt Shenoi
committed
fix: don't flush current data
1 parent dfa9f1c commit 1ee5dd0

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

src/parseable/staging/reader.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,11 +335,16 @@ mod tests {
335335
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
336336
};
337337
use arrow_schema::{DataType, Field, Schema};
338+
use chrono::Utc;
338339
use temp_dir::TempDir;
339340

340-
use crate::parseable::staging::{
341-
reader::{MergedReverseRecordReader, OffsetReader},
342-
writer::DiskWriter,
341+
use crate::{
342+
parseable::staging::{
343+
reader::{MergedReverseRecordReader, OffsetReader},
344+
writer::DiskWriter,
345+
},
346+
utils::time::TimeRange,
347+
OBJECT_STORE_DATA_GRANULARITY,
343348
};
344349

345350
use super::get_reverse_reader;
@@ -484,7 +489,9 @@ mod tests {
484489
schema: &Arc<Schema>,
485490
batches: &[RecordBatch],
486491
) -> io::Result<()> {
487-
let mut writer = DiskWriter::try_new(path, schema).expect("Failed to create StreamWriter");
492+
let range = TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY);
493+
let mut writer =
494+
DiskWriter::try_new(path, schema, range).expect("Failed to create StreamWriter");
488495

489496
for batch in batches {
490497
writer.write(batch).expect("Failed to write batch");

src/parseable/staging/writer.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ use arrow_array::RecordBatch;
2929
use arrow_ipc::writer::StreamWriter;
3030
use arrow_schema::Schema;
3131
use arrow_select::concat::concat_batches;
32+
use chrono::Utc;
3233
use itertools::Itertools;
3334
use tracing::{error, warn};
3435

3536
use crate::{
3637
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
37-
utils::arrow::adapt_batch,
38+
utils::{arrow::adapt_batch, time::TimeRange},
3839
};
3940

4041
use super::StagingError;
@@ -48,11 +49,16 @@ pub struct Writer {
4849
pub struct DiskWriter {
4950
inner: StreamWriter<BufWriter<File>>,
5051
path: PathBuf,
52+
range: TimeRange,
5153
}
5254

5355
impl DiskWriter {
5456
/// Try to create a file to stream arrows into
55-
pub fn try_new(path: impl Into<PathBuf>, schema: &Schema) -> Result<Self, StagingError> {
57+
pub fn try_new(
58+
path: impl Into<PathBuf>,
59+
schema: &Schema,
60+
range: TimeRange,
61+
) -> Result<Self, StagingError> {
5662
let mut path = path.into();
5763
path.set_extension(PART_FILE_EXTENSION);
5864
let file = OpenOptions::new()
@@ -62,7 +68,11 @@ impl DiskWriter {
6268
.open(&path)?;
6369
let inner = StreamWriter::try_new_buffered(file, schema)?;
6470

65-
Ok(Self { inner, path })
71+
Ok(Self { inner, path, range })
72+
}
73+
74+
pub fn is_current(&self) -> bool {
75+
self.range.contains(Utc::now())
6676
}
6777

6878
/// Write a single recordbatch into file

src/parseable/streams.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::{
2929

3030
use arrow_array::RecordBatch;
3131
use arrow_schema::{Field, Fields, Schema};
32-
use chrono::{NaiveDateTime, Timelike};
32+
use chrono::{NaiveDateTime, Timelike, Utc};
3333
use derive_more::{Deref, DerefMut};
3434
use itertools::Itertools;
3535
use parquet::{
@@ -51,7 +51,7 @@ use crate::{
5151
metrics,
5252
option::Mode,
5353
storage::{object_storage::to_bytes, retention::Retention, StreamType},
54-
utils::time::Minute,
54+
utils::time::{Minute, TimeRange},
5555
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
5656
};
5757

@@ -132,7 +132,11 @@ impl Stream {
132132
);
133133
std::fs::create_dir_all(&self.data_path)?;
134134

135-
let mut writer = DiskWriter::try_new(file_path, &record.schema())
135+
let range = TimeRange::granularity_range(
136+
parsed_timestamp.and_local_timezone(Utc).unwrap(),
137+
OBJECT_STORE_DATA_GRANULARITY,
138+
);
139+
let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)
136140
.expect("File and RecordBatch both are checked");
137141

138142
writer.write(record)?;
@@ -357,12 +361,12 @@ impl Stream {
357361
self.writer.lock().unwrap().mem.clear();
358362
}
359363

360-
pub fn flush(&self) {
364+
pub fn flush(&self, forced: bool) {
361365
let mut writer = self.writer.lock().unwrap();
362366
// Flush memory
363367
writer.mem.clear();
364368
// Drop schema -> disk writer mapping, triggers flush to disk
365-
writer.disk.drain();
369+
writer.disk.retain(|_, w| !forced && w.is_current());
366370
}
367371

368372
fn parquet_writer_props(
@@ -662,7 +666,7 @@ impl Stream {
662666

663667
/// First flushes arrows onto disk and then converts the arrow into parquet
664668
pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> {
665-
self.flush();
669+
self.flush(shutdown_signal);
666670

667671
self.prepare_parquet(shutdown_signal)
668672
}
@@ -974,7 +978,7 @@ mod tests {
974978
StreamType::UserDefined,
975979
)
976980
.unwrap();
977-
staging.flush();
981+
staging.flush(true);
978982
}
979983

980984
#[test]

0 commit comments

Comments
 (0)