From fa34d3a4d492b8160b0130a215677a1ce9d39f28 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 31 Oct 2023 10:52:28 +0530 Subject: [PATCH 01/10] Add alternate arrow reader --- Cargo.lock | 8 +- server/Cargo.toml | 4 +- server/src/storage/staging.rs | 4 +- server/src/utils/arrow.rs | 1 + server/src/utils/arrow/merged_reader.rs | 41 +++- server/src/utils/arrow/reverse_reader.rs | 290 +++++++++++++++++++++++ 6 files changed, 332 insertions(+), 16 deletions(-) create mode 100644 server/src/utils/arrow/reverse_reader.rs diff --git a/Cargo.lock b/Cargo.lock index 4b24d0ccd..47af15082 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1091,12 +1091,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "cookies" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ed970f886b2692d34c1976254bfae22336b12733ecb37ef0fad32128ff4bfb" - [[package]] name = "core-foundation-sys" version = "0.8.3" @@ -2661,6 +2655,7 @@ dependencies = [ "arrow-select", "async-trait", "base64 0.21.0", + "byteorder", "bytes", "bzip2", "cargo_toml", @@ -2669,7 +2664,6 @@ dependencies = [ "clap", "clokwerk", "cookie 0.17.0", - "cookies", "crossterm", "datafusion", "derive_more", diff --git a/server/Cargo.toml b/server/Cargo.toml index c6daf918c..dc762bf66 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -38,7 +38,9 @@ argon2 = "0.5.0" async-trait = "0.1" base64 = "0.21" bytes = "1.4" +byteorder = "1.4.3" bzip2 = { version = "*", features = ["static"] } +cookie = "0.17.0" chrono = "0.4" chrono-humanize = "0.2" clap = { version = "4.1", default-features = false, features = [ @@ -97,8 +99,6 @@ humantime = "2.1.0" openid = { version = "0.12.0", default-features = false, features = ["rustls"] } url = "2.4.0" http-auth-basic = "0.3.3" -cookies = "0.0.1" -cookie = "0.17.0" [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 65d65f97c..ec4c64c64 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -41,7 +41,7 @@ use crate::{ metrics, option::CONFIG, storage::OBJECT_STORE_DATA_GRANULARITY, - utils::{self, arrow::MergedRecordReader}, + utils::{self, arrow::merged_reader::MergedReverseRecordReader}, }; const ARROW_FILE_EXTENSION: &str = "data.arrows"; @@ -198,7 +198,7 @@ pub fn convert_disk_files_to_parquet( .add(file_size as i64); } - let record_reader = MergedRecordReader::try_new(&files).unwrap(); + let record_reader = MergedReverseRecordReader::try_new(&files).unwrap(); let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index 5719001d5..4d1a03ccf 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -25,6 +25,7 @@ use itertools::Itertools; pub mod batch_adapter; pub mod merged_reader; +pub mod reverse_reader; pub use batch_adapter::adapt_batch; pub use merged_reader::MergedRecordReader; diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 19a29ac6d..a1f0d090c 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -24,8 +24,11 @@ use arrow_ipc::reader::StreamReader; use arrow_schema::Schema; use itertools::kmerge_by; -use super::adapt_batch; -use crate::event::DEFAULT_TIMESTAMP_KEY; +use super::{ + adapt_batch, + reverse_reader::{reverse, OffsetReader}, +}; +use crate::{event::DEFAULT_TIMESTAMP_KEY, utils}; #[derive(Debug)] pub struct MergedRecordReader { @@ -44,14 +47,42 @@ impl MergedRecordReader { Ok(Self { readers }) } - pub fn merged_iter(self, schema: Arc) -> impl Iterator { - let adapted_readers = self.readers.into_iter().map(move |reader| reader.flatten()); + pub fn merged_schema(&self) -> Schema { + Schema::try_merge( + self.readers + .iter() + .map(|reader| reader.schema().as_ref().clone()), + ) + .unwrap() + } +} +#[derive(Debug)] +pub struct MergedReverseRecordReader { + pub readers: Vec>>>, +} + +impl MergedReverseRecordReader { + pub fn try_new(files: &[PathBuf]) -> Result { + let mut readers = Vec::with_capacity(files.len()); + for file in files { + let reader = + utils::arrow::reverse_reader::get_reverse_reader(File::open(file).unwrap()) + .map_err(|_| ())?; + readers.push(reader); + } + + Ok(Self { readers }) + } + + pub fn merged_iter(self, schema: Arc) -> impl Iterator { + let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { let a_time = get_timestamp_millis(a); let b_time = get_timestamp_millis(b); - a_time < b_time + a_time > b_time }) + .map(|batch| reverse(&batch)) .map(move |batch| adapt_batch(&schema, &batch)) } diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs new file mode 100644 index 000000000..a9ea50011 --- /dev/null +++ b/server/src/utils/arrow/reverse_reader.rs @@ -0,0 +1,290 @@ +use std::{ + io::{self, BufReader, Read, Seek, SeekFrom}, + vec::IntoIter, +}; + +use arrow_array::{RecordBatch, UInt64Array}; +use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; +use arrow_select::take::take; +use byteorder::{LittleEndian, ReadBytesExt}; + +/// OffsetReader takes in a reader and list of offset and sizes and +/// provides a reader over the file by reading only the offsets +/// from start of the list to end. +/// +/// Safety Invariant: Reader is already validated and all offset and limit are valid to read. +/// +/// On empty list the reader returns no bytes read. +pub struct OffsetReader { + reader: R, + offset_list: IntoIter<(u64, usize)>, + current_offset: u64, + current_size: usize, + buffer: Vec, + buffer_position: usize, + finished: bool, +} + +impl OffsetReader { + fn new(reader: R, offset_list: Vec<(u64, usize)>) -> Self { + let mut offset_list = offset_list.into_iter(); + let mut finished = false; + + let (current_offset, current_size) = offset_list.next().unwrap_or_default(); + if current_offset == 0 && current_size == 0 { + finished = true + } + + OffsetReader { + reader, + offset_list, + current_offset, + current_size, + buffer: vec![0; 4096], + buffer_position: 0, + finished, + } + } +} + +impl Read for OffsetReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let offset = self.current_offset; + let size = self.current_size; + + if self.finished { + return Ok(0); + } + // on empty buffer load current data represented by + // current_offset and current_size into self buffer + if self.buffer_position == 0 { + self.reader.seek(SeekFrom::Start(offset))?; + // resize for current message + if self.buffer.capacity() < size { + self.buffer.resize(size, 0) + } + self.reader.read_exact(&mut self.buffer[0..size])?; + } + + let remaining_bytes = size - self.buffer_position; + let max_read = usize::min(remaining_bytes, buf.len()); + + // Copy data from the buffer to the provided buffer + let read_data = &self.buffer[self.buffer_position..self.buffer_position + max_read]; + buf[..max_read].copy_from_slice(read_data); + + self.buffer_position += max_read; + + if self.buffer_position >= size { + // If we've read the entire section, move to the next offset + match self.offset_list.next() { + Some((offset, size)) => { + self.current_offset = offset; + self.current_size = size; + self.buffer_position = 0; + } + None => { + // iter is exhausted, no more read can be done + self.finished = true + } + } + } + + Ok(max_read) + } +} + +pub fn get_reverse_reader( + mut reader: T, +) -> Result>>, io::Error> { + let mut offset = 0; + let mut messages = Vec::new(); + + while let Some(res) = find_limit_and_type(&mut reader).transpose() { + match res { + Ok((header, size)) => { + messages.push((header, offset, size)); + offset += size; + } + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break, + Err(err) => return Err(err), + } + } + + // reverse everything leaving the first because it has schema message. + messages[1..].reverse(); + let messages = messages + .into_iter() + .map(|(_, offset, size)| (offset as u64, size)) + .collect(); + + // reset reader + reader.seek(SeekFrom::Start(0)).unwrap(); + + Ok(StreamReader::try_new(OffsetReader::new(reader, messages), None).unwrap()) +} + +pub fn reverse(rb: &RecordBatch) -> RecordBatch { + let indices = UInt64Array::from_iter_values((0..rb.num_rows()).rev().map(|x| x as u64)); + let arrays = rb + .columns() + .iter() + .map(|col| take(&col, &indices, None).unwrap()) + .collect(); + RecordBatch::try_new(rb.schema(), arrays).unwrap() +} + +// return limit for +fn find_limit_and_type( + reader: &mut (impl Read + Seek), +) -> Result, io::Error> { + let mut size = 0; + let marker = reader.read_u32::()?; + size += 4; + + if marker != 0xFFFFFFFF { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid Continuation Marker", + )); + } + + let metadata_size = reader.read_u32::()? as usize; + size += 4; + + if metadata_size == 0x00000000 { + return Ok(None); + } + + let mut message = vec![0u8; metadata_size]; + reader.read_exact(&mut message)?; + size += metadata_size; + + let message = unsafe { root_as_message_unchecked(&mut message) }; + let header = message.header_type(); + let message_size = message.bodyLength(); + size += message_size as usize; + + let padding = (8 - (size % 8)) % 8; + reader.seek(SeekFrom::Current(padding as i64 + message_size))?; + size += padding; + + Ok(Some((header, size))) +} + +#[cfg(test)] +mod tests { + use std::{io::Cursor, sync::Arc}; + + use arrow_array::{ + cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray, + }; + use arrow_ipc::writer::{ + write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, + }; + + use super::get_reverse_reader; + + fn rb(rows: usize) -> RecordBatch { + let array1: Arc = Arc::new(Int64Array::from_iter(0..(rows as i64))); + let array2: Arc = Arc::new(Float64Array::from_iter((0..rows).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter( + (0..rows).map(|x| Some(format!("str {}", x))), + )); + + RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap() + } + + fn write_mem(rbs: &[RecordBatch]) -> Vec { + let buf = Vec::new(); + let mut writer = StreamWriter::try_new(buf, &rbs[0].schema()).unwrap(); + + for rb in rbs { + writer.write(rb).unwrap() + } + + writer.into_inner().unwrap() + } + + #[test] + fn test_empty_row() { + let rb = rb(0); + let buf = write_mem(&[rb]); + let reader = Cursor::new(buf); + let mut reader = get_reverse_reader(reader).unwrap(); + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 0); + } + + #[test] + fn test_one_row() { + let rb = rb(1); + let buf = write_mem(&[rb]); + let reader = Cursor::new(buf); + let mut reader = get_reverse_reader(reader).unwrap(); + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 1); + } + + #[test] + fn test_multiple_row_multiple_rbs() { + let buf = write_mem(&[rb(1), rb(2), rb(3)]); + let reader = Cursor::new(buf); + let mut reader = get_reverse_reader(reader).unwrap(); + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 3); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![2, 1, 0]); + + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 2); + + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 1); + } + + #[test] + fn manual_write() { + let error_on_replacement = true; + let options = IpcWriteOptions::default(); + let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); + let data_gen = IpcDataGenerator {}; + + let mut buf = Vec::new(); + let rb1 = rb(1); + + let schema = data_gen.schema_to_bytes(&rb1.schema(), &options); + write_message(&mut buf, schema, &options).unwrap(); + + for i in (1..=3).cycle().skip(1).take(10000) { + let (_, encoded_message) = data_gen + .encoded_batch(&rb(i), &mut dictionary_tracker, &options) + .unwrap(); + write_message(&mut buf, encoded_message, &options).unwrap(); + } + + let schema = data_gen.schema_to_bytes(&rb1.schema(), &options); + write_message(&mut buf, schema, &options).unwrap(); + + let buf = Cursor::new(buf); + let mut reader = get_reverse_reader(buf).unwrap().flatten(); + + let mut sum = 0; + while let Some(rb) = reader.next() { + sum += 1; + assert!(rb.num_rows() > 0); + } + + assert_eq!(sum, 10000); + } +} From f90d915021b0e290e901e0bf4ffd4595d427ff49 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 31 Oct 2023 11:07:06 +0530 Subject: [PATCH 02/10] Fix --- server/src/utils/arrow/reverse_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs index a9ea50011..28d0890a1 100644 --- a/server/src/utils/arrow/reverse_reader.rs +++ b/server/src/utils/arrow/reverse_reader.rs @@ -160,7 +160,7 @@ fn find_limit_and_type( reader.read_exact(&mut message)?; size += metadata_size; - let message = unsafe { root_as_message_unchecked(&mut message) }; + let message = unsafe { root_as_message_unchecked(&message) }; let header = message.header_type(); let message_size = message.bodyLength(); size += message_size as usize; From 2d1771c46feb0f53293e34d4013697a1cf40e5f3 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 31 Oct 2023 11:11:21 +0530 Subject: [PATCH 03/10] Fix --- server/src/utils/arrow/reverse_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs index 28d0890a1..ae791e66e 100644 --- a/server/src/utils/arrow/reverse_reader.rs +++ b/server/src/utils/arrow/reverse_reader.rs @@ -119,7 +119,7 @@ pub fn get_reverse_reader( .collect(); // reset reader - reader.seek(SeekFrom::Start(0)).unwrap(); + reader.rewind(); Ok(StreamReader::try_new(OffsetReader::new(reader, messages), None).unwrap()) } From 3c851431e3476af216fb473ed225942ea8f725d2 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 31 Oct 2023 11:14:12 +0530 Subject: [PATCH 04/10] Test fix --- server/src/utils/arrow/reverse_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs index ae791e66e..53d66ae46 100644 --- a/server/src/utils/arrow/reverse_reader.rs +++ b/server/src/utils/arrow/reverse_reader.rs @@ -244,7 +244,7 @@ mod tests { .iter() .flatten() .collect(); - assert_eq!(col1_val, vec![2, 1, 0]); + assert_eq!(col1_val, vec![0, 1, 2]); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 2); From 32086d2b558efb10fa74b0a156f8fb139d86eaed Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 31 Oct 2023 11:19:32 +0530 Subject: [PATCH 05/10] Fix --- server/src/utils/arrow/reverse_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs index 53d66ae46..27e3a07ee 100644 --- a/server/src/utils/arrow/reverse_reader.rs +++ b/server/src/utils/arrow/reverse_reader.rs @@ -119,7 +119,7 @@ pub fn get_reverse_reader( .collect(); // reset reader - reader.rewind(); + reader.rewind()?; Ok(StreamReader::try_new(OffsetReader::new(reader, messages), None).unwrap()) } From 74b392642f8835b8f020b7ca04ec585f96d76f26 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 31 Oct 2023 13:15:48 +0530 Subject: [PATCH 06/10] Change to len --- server/src/utils/arrow/reverse_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs index 27e3a07ee..30546a0a0 100644 --- a/server/src/utils/arrow/reverse_reader.rs +++ b/server/src/utils/arrow/reverse_reader.rs @@ -60,7 +60,7 @@ impl Read for OffsetReader { if self.buffer_position == 0 { self.reader.seek(SeekFrom::Start(offset))?; // resize for current message - if self.buffer.capacity() < size { + if self.buffer.len() < size { self.buffer.resize(size, 0) } self.reader.read_exact(&mut self.buffer[0..size])?; From 78a463b7fd02880af839333f7a185cfb0a68d691 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 31 Oct 2023 19:11:05 +0530 Subject: [PATCH 07/10] Change sort information --- server/src/query.rs | 2 +- server/src/query/table_provider.rs | 13 +++++++++++++ server/src/storage/staging.rs | 4 ++-- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 6e251ed79..333863108 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -171,7 +171,7 @@ impl Query { return Ok(None); } let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); - let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; + let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(false, true)]]; let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") .with_file_sort_order(file_sort_order) diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 275cc9914..10579d967 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -34,6 +34,8 @@ use std::any::Any; use std::sync::Arc; use std::vec; +use crate::utils::arrow::reverse_reader::reverse; + pub struct QueryTableProvider { staging: Option, // remote table @@ -47,6 +49,17 @@ impl QueryTableProvider { storage: Option>, schema: Arc, ) -> Result { + // in place reverse transform + let staging = if let Some(mut staged_batches) = staging { + staged_batches[..].reverse(); + staged_batches + .iter_mut() + .for_each(|batch| *batch = reverse(batch)); + Some(staged_batches) + } else { + None + }; + let memtable = staging .map(|records| MemTable::try_new(schema.clone(), vec![records])) .transpose()?; diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index ec4c64c64..80af0d47f 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -239,8 +239,8 @@ fn parquet_writer_props() -> WriterPropertiesBuilder { ) .set_sorting_columns(Some(vec![SortingColumn { column_idx: 0, - descending: false, - nulls_first: false, + descending: true, + nulls_first: true, }])) } From d6c1826d2090ab4072ade27765a8da41db0427df Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 1 Nov 2023 22:32:35 +0530 Subject: [PATCH 08/10] Add extra time interval to object sync --- server/src/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main.rs b/server/src/main.rs index 3cf36b57d..74e2e523b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -16,7 +16,7 @@ * */ -use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; +use clokwerk::{AsyncScheduler, Job, Scheduler, TimeUnits}; use thread_priority::{ThreadBuilder, ThreadPriority}; use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; @@ -122,6 +122,8 @@ fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sende let mut scheduler = AsyncScheduler::new(); scheduler .every((CONFIG.parseable.upload_interval as u32).seconds()) + // Extra time interval is added so that this schedular does not race with local sync. + .plus(5u32.seconds()) .run(|| async { if let Err(e) = CONFIG.storage().get_object_store().sync().await { log::warn!("failed to sync local data with object store. {:?}", e); From c29dddd93e0bcc9abd0b48826d32b3a60b1fb060 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 1 Nov 2023 22:33:12 +0530 Subject: [PATCH 09/10] Use new console release --- server/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index dc762bf66..5981ded5f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -114,8 +114,8 @@ maplit = "1.0" rstest = "0.16" [package.metadata.parseable_ui] -assets-url = "https://github.com/parseablehq/console/releases/download/v0.3.1/build.zip" -assets-sha1 = "6abd7b5ca5b9c832ff58b8450cffdc83dd7172bf" +assets-url = "https://github.com/parseablehq/console/releases/download/v0.3.3/build.zip" +assets-sha1 = "29e1eaa2dfa081d495c1504f05b78914d074990c" [features] debug = [] From 3254e1b5c08f89068f1fa191ab60d8fc08383294 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 1 Nov 2023 22:36:53 +0530 Subject: [PATCH 10/10] Add header --- server/src/utils/arrow/reverse_reader.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/src/utils/arrow/reverse_reader.rs b/server/src/utils/arrow/reverse_reader.rs index 30546a0a0..82c01c1ba 100644 --- a/server/src/utils/arrow/reverse_reader.rs +++ b/server/src/utils/arrow/reverse_reader.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{ io::{self, BufReader, Read, Seek, SeekFrom}, vec::IntoIter,