Skip to content

Commit 48bd263

Browse files
committed
Changes required for Parseable Enterprise
1 parent 1b4ea73 commit 48bd263

File tree

9 files changed

+214
-14
lines changed

9 files changed

+214
-14
lines changed

src/enterprise/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod utils;

src/enterprise/utils.rs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
use std::{collections::HashMap, path::PathBuf, sync::Arc};
2+
3+
use datafusion::{common::Column, prelude::Expr};
4+
use itertools::Itertools;
5+
use relative_path::RelativePathBuf;
6+
7+
use crate::query::stream_schema_provider::extract_primary_filter;
8+
use crate::{
9+
catalog::{
10+
manifest::{File, Manifest},
11+
snapshot, Snapshot,
12+
},
13+
event,
14+
parseable::PARSEABLE,
15+
query::{stream_schema_provider::ManifestExt, PartialTimeFilter},
16+
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
17+
utils::time::TimeRange,
18+
};
19+
20+
pub fn create_time_filter(
21+
time_range: &TimeRange,
22+
time_partition: Option<String>,
23+
table_name: &str,
24+
) -> Vec<Expr> {
25+
let mut new_filters = vec![];
26+
let start_time = time_range.start.naive_utc();
27+
let end_time = time_range.end.naive_utc();
28+
let mut _start_time_filter: Expr;
29+
let mut _end_time_filter: Expr;
30+
31+
match time_partition {
32+
Some(time_partition) => {
33+
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
34+
.binary_expr(Expr::Column(Column::new(
35+
Some(table_name.to_owned()),
36+
time_partition.clone(),
37+
)));
38+
_end_time_filter =
39+
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(
40+
Expr::Column(Column::new(Some(table_name.to_owned()), time_partition)),
41+
);
42+
}
43+
None => {
44+
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
45+
.binary_expr(Expr::Column(Column::new(
46+
Some(table_name.to_owned()),
47+
event::DEFAULT_TIMESTAMP_KEY,
48+
)));
49+
_end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
50+
.binary_expr(Expr::Column(Column::new(
51+
Some(table_name.to_owned()),
52+
event::DEFAULT_TIMESTAMP_KEY,
53+
)));
54+
}
55+
}
56+
57+
new_filters.push(_start_time_filter);
58+
new_filters.push(_end_time_filter);
59+
60+
new_filters
61+
}
62+
63+
pub async fn fetch_parquet_file_paths(
64+
stream: &str,
65+
time_range: &TimeRange,
66+
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
67+
let glob_storage = PARSEABLE.storage.get_object_store();
68+
69+
let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap();
70+
71+
let time_partition = object_store_format.time_partition;
72+
73+
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);
74+
75+
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);
76+
77+
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();
78+
79+
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
80+
let obs = glob_storage
81+
.get_objects(
82+
Some(&path),
83+
Box::new(|file_name| file_name.ends_with("stream.json")),
84+
)
85+
.await;
86+
if let Ok(obs) = obs {
87+
for ob in obs {
88+
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
89+
let snapshot = object_store_format.snapshot;
90+
for manifest in snapshot.manifest_list {
91+
merged_snapshot.manifest_list.push(manifest);
92+
}
93+
}
94+
}
95+
}
96+
97+
let manifest_files = collect_manifest_files(
98+
glob_storage,
99+
merged_snapshot
100+
.manifests(&time_filters)
101+
.into_iter()
102+
.sorted_by_key(|file| file.time_lower_bound)
103+
.map(|item| item.manifest_path)
104+
.collect(),
105+
)
106+
.await
107+
.unwrap();
108+
109+
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();
110+
111+
let mut selected_files = manifest_files
112+
.into_iter()
113+
.flat_map(|file| file.files)
114+
.rev()
115+
.collect_vec();
116+
117+
for filter in time_filter_expr {
118+
selected_files.retain(|file| !file.can_be_pruned(&filter))
119+
}
120+
121+
selected_files
122+
.into_iter()
123+
.map(|file| {
124+
let date = file.file_path.split("/").collect_vec();
125+
126+
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
127+
128+
let date = RelativePathBuf::from_iter(date);
129+
130+
parquet_files.entry(date).or_default().push(file);
131+
})
132+
.for_each(|_| {});
133+
134+
Ok(parquet_files)
135+
}
136+
137+
async fn collect_manifest_files(
138+
storage: Arc<dyn ObjectStorage>,
139+
manifest_urls: Vec<String>,
140+
) -> Result<Vec<Manifest>, object_store::Error> {
141+
let mut tasks = Vec::new();
142+
manifest_urls.into_iter().for_each(|path| {
143+
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap();
144+
let storage = Arc::clone(&storage);
145+
tasks.push(tokio::task::spawn(async move {
146+
storage.get_object(&path).await.unwrap()
147+
}));
148+
});
149+
150+
let mut op = Vec::new();
151+
for task in tasks {
152+
let file = task.await.unwrap();
153+
op.push(file);
154+
}
155+
156+
Ok(op
157+
.into_iter()
158+
.map(|res| serde_json::from_slice(&res).unwrap())
159+
.collect())
160+
}

src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ pub mod alerts;
2121
pub mod analytics;
2222
pub mod audit;
2323
pub mod banner;
24-
mod catalog;
24+
pub mod catalog;
2525
mod cli;
2626
#[cfg(feature = "kafka")]
2727
pub mod connectors;
2828
pub mod correlation;
29+
pub mod enterprise;
2930
mod event;
3031
pub mod handlers;
3132
pub mod hottier;
@@ -37,15 +38,15 @@ mod oidc;
3738
pub mod option;
3839
pub mod otel;
3940
pub mod parseable;
40-
mod query;
41+
pub mod query;
4142
pub mod rbac;
4243
mod response;
4344
mod static_schema;
4445
mod stats;
4546
pub mod storage;
4647
pub mod sync;
4748
pub mod users;
48-
mod utils;
49+
pub mod utils;
4950
mod validator;
5051

5152
use std::time::Duration;

src/parseable/streams.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -513,10 +513,7 @@ impl Stream {
513513
let file_size = match file.metadata() {
514514
Ok(meta) => meta.len(),
515515
Err(err) => {
516-
warn!(
517-
"File ({}) not found; Error = {err}",
518-
file.display()
519-
);
516+
warn!("File ({}) not found; Error = {err}", file.display());
520517
continue;
521518
}
522519
};

src/query/stream_schema_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ pub fn extract_primary_filter(
894894
.collect()
895895
}
896896

897-
trait ManifestExt: ManifestFile {
897+
pub trait ManifestExt: ManifestFile {
898898
fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> {
899899
let name = match partial_filter {
900900
Expr::BinaryExpr(binary_expr) => {

src/storage/azure_blob.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ use datafusion::{
3535
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
3636
use object_store::{
3737
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
38+
buffered::BufReader,
3839
limit::LimitStore,
3940
path::Path as StorePath,
40-
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
41+
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
4142
};
4243
use relative_path::{RelativePath, RelativePathBuf};
4344
use tracing::{error, info};
@@ -423,6 +424,16 @@ impl BlobStore {
423424

424425
#[async_trait]
425426
impl ObjectStorage for BlobStore {
427+
async fn get_buffered_reader(
428+
&self,
429+
_path: &RelativePath,
430+
) -> Result<BufReader, ObjectStorageError> {
431+
unimplemented!()
432+
}
433+
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
434+
unimplemented!()
435+
}
436+
426437
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
427438
Ok(self._get_object(path).await?)
428439
}

src/storage/localfs.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use bytes::Bytes;
2828
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
2929
use fs_extra::file::CopyOptions;
3030
use futures::{stream::FuturesUnordered, TryStreamExt};
31+
use object_store::{buffered::BufReader, ObjectMeta};
3132
use relative_path::{RelativePath, RelativePathBuf};
3233
use tokio::fs::{self, DirEntry};
3334
use tokio_stream::wrappers::ReadDirStream;
@@ -103,6 +104,15 @@ impl LocalFS {
103104

104105
#[async_trait]
105106
impl ObjectStorage for LocalFS {
107+
async fn get_buffered_reader(
108+
&self,
109+
_path: &RelativePath,
110+
) -> Result<BufReader, ObjectStorageError> {
111+
unimplemented!()
112+
}
113+
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
114+
unimplemented!()
115+
}
106116
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
107117
let time = Instant::now();
108118
let file_path = self.path_in_root(path);

src/storage/object_storage.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use async_trait::async_trait;
3333
use bytes::Bytes;
3434
use chrono::{DateTime, Utc};
3535
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
36+
use object_store::buffered::BufReader;
37+
use object_store::ObjectMeta;
3638
use once_cell::sync::OnceCell;
3739
use relative_path::RelativePath;
3840
use relative_path::RelativePathBuf;
@@ -74,6 +76,11 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync
7476

7577
#[async_trait]
7678
pub trait ObjectStorage: Debug + Send + Sync + 'static {
79+
async fn get_buffered_reader(
80+
&self,
81+
path: &RelativePath,
82+
) -> Result<BufReader, ObjectStorageError>;
83+
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError>;
7784
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError>;
7885
// TODO: make the filter function optional as we may want to get all objects
7986
async fn get_objects(

src/storage/s3.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ use datafusion::{
3737
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
3838
use object_store::{
3939
aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum},
40+
buffered::BufReader,
4041
limit::LimitStore,
4142
path::Path as StorePath,
42-
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
43+
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
4344
};
4445
use relative_path::{RelativePath, RelativePathBuf};
4546
use tracing::{error, info};
@@ -310,9 +311,6 @@ impl ObjectStorageProvider for S3Config {
310311
fn construct_client(&self) -> Arc<dyn ObjectStorage> {
311312
let s3 = self.get_default_builder().build().unwrap();
312313

313-
// limit objectstore to a concurrent request limit
314-
let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS);
315-
316314
Arc::new(S3 {
317315
client: s3,
318316
bucket: self.bucket_name.clone(),
@@ -331,7 +329,7 @@ impl ObjectStorageProvider for S3Config {
331329

332330
#[derive(Debug)]
333331
pub struct S3 {
334-
client: LimitStore<AmazonS3>,
332+
client: AmazonS3,
335333
bucket: String,
336334
root: StorePath,
337335
}
@@ -557,6 +555,21 @@ impl S3 {
557555

558556
#[async_trait]
559557
impl ObjectStorage for S3 {
558+
async fn get_buffered_reader(
559+
&self,
560+
path: &RelativePath,
561+
) -> Result<BufReader, ObjectStorageError> {
562+
let path = &to_object_store_path(path);
563+
let meta = self.client.head(path).await.unwrap();
564+
565+
let store: Arc<dyn ObjectStore> = Arc::new(self.client.clone());
566+
let buf = object_store::buffered::BufReader::new(store, &meta);
567+
Ok(buf)
568+
}
569+
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
570+
Ok(self.client.head(&to_object_store_path(path)).await.unwrap())
571+
}
572+
560573
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
561574
Ok(self._get_object(path).await?)
562575
}

0 commit comments

Comments
 (0)