Skip to content

Commit 1bf85de

Browse files
fix: querier to use manifest list from all ingestors (#747)
1 parent eddd332 commit 1bf85de

File tree

2 files changed

+43
-9
lines changed

2 files changed

+43
-9
lines changed

.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
target
2-
data
3-
staging
2+
data*
3+
staging*
44
limitcache
55
examples
66
cert.pem

server/src/query/stream_schema_provider.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
*
1717
*/
1818

19+
use crate::Mode;
20+
use crate::{
21+
catalog::snapshot::{self, Snapshot},
22+
storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
23+
};
1924
use arrow_array::RecordBatch;
2025
use arrow_schema::{Schema, SchemaRef, SortOptions};
2126
use bytes::Bytes;
@@ -44,13 +49,13 @@ use datafusion::{
4449
use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
4550
use itertools::Itertools;
4651
use object_store::{path::Path, ObjectStore};
52+
use relative_path::RelativePathBuf;
4753
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
4854
use url::Url;
4955

5056
use crate::{
5157
catalog::{
5258
self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile,
53-
Snapshot,
5459
},
5560
event::{self, DEFAULT_TIMESTAMP_KEY},
5661
localcache::LocalCacheManager,
@@ -61,6 +66,7 @@ use crate::{
6166
};
6267

6368
use super::listing_table_builder::ListingTableBuilder;
69+
use crate::catalog::Snapshot as CatalogSnapshot;
6470

6571
// schema provider for stream based on global data
6672
pub struct GlobalSchemaProvider {
@@ -316,12 +322,34 @@ impl TableProvider for StandardTableProvider {
316322
);
317323
}
318324
};
319-
320-
// Fetch snapshot
321-
let snapshot = object_store_format.snapshot;
325+
let mut merged_snapshot: snapshot::Snapshot = Snapshot::default();
326+
if CONFIG.parseable.mode == Mode::Query {
327+
let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]);
328+
let obs = glob_storage
329+
.get_objects(
330+
Some(&path),
331+
Box::new(|file_name| file_name.starts_with(".ingester")),
332+
)
333+
.await;
334+
335+
if let Ok(obs) = obs {
336+
for ob in obs {
337+
if let Ok(object_store_format) =
338+
serde_json::from_slice::<ObjectStoreFormat>(&ob)
339+
{
340+
let snapshot = object_store_format.snapshot;
341+
for manifest in snapshot.manifest_list {
342+
merged_snapshot.manifest_list.push(manifest);
343+
}
344+
}
345+
}
346+
}
347+
} else {
348+
merged_snapshot = object_store_format.snapshot;
349+
}
322350

323351
// Is query timerange is overlapping with older data.
324-
if is_overlapping_query(&snapshot.manifest_list, &time_filters) {
352+
if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) {
325353
return legacy_listing_table(
326354
self.stream.clone(),
327355
memory_exec,
@@ -338,8 +366,14 @@ impl TableProvider for StandardTableProvider {
338366
.await;
339367
}
340368

341-
let mut manifest_files =
342-
collect_from_snapshot(&snapshot, &time_filters, object_store, filters, limit).await?;
369+
let mut manifest_files = collect_from_snapshot(
370+
&merged_snapshot,
371+
&time_filters,
372+
object_store,
373+
filters,
374+
limit,
375+
)
376+
.await?;
343377

344378
if manifest_files.is_empty() {
345379
return final_plan(vec![memory_exec], projection, self.schema.clone());

0 commit comments

Comments
 (0)