Skip to content

Commit 1fc8ed2

Browse files
committed
handle dict preloading for agg
handle cardinality for fields
1 parent ab2145e commit 1fc8ed2

File tree

5 files changed

+126
-25
lines changed

5 files changed

+126
-25
lines changed

quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,7 @@ impl DefaultDocMapperBuilder {
147147
let field_entries = root_field_mapping.compute_field_entries();
148148
let required_fast_fields: Vec<FieldPath> = field_entries
149149
.into_iter()
150-
.filter(|(_, field_type)| match field_type {
151-
FieldType::U64(options)
152-
| FieldType::I64(options)
153-
| FieldType::F64(options)
154-
| FieldType::Date(options) => options.is_fast(),
155-
FieldType::Bytes(option) => option.is_fast(),
156-
_ => false,
157-
})
150+
.filter(|(_, field_type)| field_type.is_fast())
158151
.map(|(field_path, _)| field_path)
159152
.collect();
160153

quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,8 @@ pub struct QuickwitTextOptions {
526526
pub fieldnorms: bool,
527527
#[serde(default = "default_as_true")]
528528
pub stored: bool,
529+
#[serde(default)]
530+
pub fast: bool,
529531
}
530532

531533
impl From<QuickwitTextOptions> for TextOptions {
@@ -534,6 +536,9 @@ impl From<QuickwitTextOptions> for TextOptions {
534536
if quickwit_text_options.stored {
535537
text_options = text_options.set_stored();
536538
}
539+
if quickwit_text_options.fast {
540+
text_options = text_options.set_fast();
541+
}
537542
if quickwit_text_options.indexed {
538543
let mut text_field_indexing = TextFieldIndexing::default();
539544
if let Some(tokenizer_name) = quickwit_text_options.tokenizer {

quickwit-search/src/collector.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use std::collections::{BinaryHeap, HashSet};
2323
use itertools::Itertools;
2424
use quickwit_doc_mapper::{DocMapper, SortBy, SortOrder};
2525
use quickwit_proto::{LeafSearchResponse, PartialHit, SearchRequest};
26-
use tantivy::aggregation::agg_req::{get_fast_field_names, Aggregations};
26+
use tantivy::aggregation::agg_req::{
27+
get_fast_field_names, get_term_dict_field_names, Aggregations,
28+
};
2729
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
2830
use tantivy::aggregation::AggregationSegmentCollector;
2931
use tantivy::collector::{Collector, SegmentCollector};
@@ -228,11 +230,6 @@ impl SegmentCollector for QuickwitSegmentCollector {
228230
}
229231
}
230232

231-
// TODO: seems not very useful, remove it and refactor it.
232-
pub trait GenericQuickwitCollector: Collector {
233-
fn fast_field_names(&self) -> HashSet<String>;
234-
}
235-
236233
/// The quickwit collector is the tantivy Collector used in Quickwit.
237234
///
238235
/// It defines the data that should be accumulated about the documents matching
@@ -250,14 +247,21 @@ pub struct QuickwitCollector {
250247
pub aggregation: Option<Aggregations>,
251248
}
252249

253-
impl GenericQuickwitCollector for QuickwitCollector {
254-
fn fast_field_names(&self) -> HashSet<String> {
250+
impl QuickwitCollector {
251+
pub fn fast_field_names(&self) -> HashSet<String> {
255252
let mut fast_field_names = self.fast_field_names.clone();
256253
if let Some(aggregate) = self.aggregation.as_ref() {
257254
fast_field_names.extend(get_fast_field_names(aggregate));
258255
}
259256
fast_field_names
260257
}
258+
pub fn term_dict_field_names(&self) -> HashSet<String> {
259+
let mut term_dict_field_names = HashSet::default();
260+
if let Some(aggregate) = self.aggregation.as_ref() {
261+
term_dict_field_names.extend(get_term_dict_field_names(aggregate));
262+
}
263+
term_dict_field_names
264+
}
261265
}
262266

263267
impl Collector for QuickwitCollector {

quickwit-search/src/leaf.rs

Lines changed: 107 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
use std::collections::{BTreeMap, HashSet};
2121
use std::path::PathBuf;
22+
use std::pin::Pin;
2223
use std::sync::Arc;
2324

2425
use anyhow::Context;
2526
use futures::future::try_join_all;
27+
use futures::Future;
2628
use itertools::{Either, Itertools};
2729
use once_cell::sync::OnceCell;
2830
use quickwit_config::get_searcher_config_instance;
@@ -36,12 +38,14 @@ use quickwit_storage::{
3638
};
3739
use tantivy::collector::Collector;
3840
use tantivy::directory::FileSlice;
41+
use tantivy::error::AsyncIoError;
3942
use tantivy::query::Query;
43+
use tantivy::schema::{Cardinality, FieldType};
4044
use tantivy::{Index, ReloadPolicy, Searcher, Term};
4145
use tokio::task::spawn_blocking;
4246
use tracing::*;
4347

44-
use crate::collector::{make_collector_for_split, make_merge_collector, GenericQuickwitCollector};
48+
use crate::collector::{make_collector_for_split, make_merge_collector};
4549
use crate::SearchError;
4650

4751
fn global_split_footer_cache() -> &'static MemorySizedCache<String> {
@@ -121,23 +125,83 @@ pub(crate) async fn open_index(
121125
///
122126
/// The downloaded data depends on the query (which term's posting list is required,
123127
/// are position required too), and the collector.
128+
///
129+
/// * `query` - query is used to extract the terms and the fields which will be loaded from the
130+
/// inverted_index.
131+
///
132+
/// * `term_dict_field_names` - A list of fields, where the whole dictionary needs to be loaded.
133+
/// This is e.g. required for term aggregation, since we don't know in advance which terms are going
134+
/// to be hit.
124135
#[instrument(skip(searcher, query, fast_field_names))]
125136
pub(crate) async fn warmup(
126137
searcher: &Searcher,
127138
query: &dyn Query,
128139
fast_field_names: &HashSet<String>,
140+
term_dict_field_names: &HashSet<String>,
129141
) -> anyhow::Result<()> {
130142
let warm_up_terms_future =
131143
warm_up_terms(searcher, query).instrument(debug_span!("warm_up_terms"));
144+
let warm_up_term_dict_future = warm_up_term_dict_fields(searcher, term_dict_field_names)
145+
.instrument(debug_span!("warm_up_term_dicts"));
132146
let warm_up_fastfields_future = warm_up_fastfields(searcher, fast_field_names)
133147
.instrument(debug_span!("warm_up_fastfields"));
134-
let (warm_up_terms_res, warm_up_fastfields_res) =
135-
tokio::join!(warm_up_terms_future, warm_up_fastfields_future);
148+
let (warm_up_terms_res, warm_up_fastfields_res, warm_up_term_dict_res) = tokio::join!(
149+
warm_up_terms_future,
150+
warm_up_fastfields_future,
151+
warm_up_term_dict_future
152+
);
136153
warm_up_terms_res?;
137154
warm_up_fastfields_res?;
155+
warm_up_term_dict_res?;
156+
Ok(())
157+
}
158+
159+
async fn warm_up_term_dict_fields(
160+
searcher: &Searcher,
161+
term_dict_field_names: &HashSet<String>,
162+
) -> anyhow::Result<()> {
163+
let mut term_dict_fields = Vec::new();
164+
for term_dict_field_name in term_dict_field_names.iter() {
165+
let term_dict_field = searcher
166+
.schema()
167+
.get_field(term_dict_field_name)
168+
.with_context(|| {
169+
format!(
170+
"Couldn't get field named {:?} from schema.",
171+
term_dict_field_name
172+
)
173+
})?;
174+
175+
term_dict_fields.push(term_dict_field);
176+
}
177+
178+
let mut warm_up_futures = Vec::new();
179+
for field in term_dict_fields {
180+
for segment_reader in searcher.segment_readers() {
181+
let inverted_index = segment_reader.inverted_index(field)?.clone();
182+
warm_up_futures.push(async move {
183+
let dict = inverted_index.terms();
184+
dict.warm_up_dictionary().await
185+
});
186+
}
187+
}
188+
try_join_all(warm_up_futures).await?;
138189
Ok(())
139190
}
140191

192+
pub fn get_fastfield_cardinality(field_type: &FieldType) -> Option<Cardinality> {
193+
match field_type {
194+
FieldType::U64(options) | FieldType::I64(options) | FieldType::F64(options) => {
195+
options.get_fastfield_cardinality()
196+
}
197+
FieldType::Date(options) => options.get_fastfield_cardinality(),
198+
FieldType::Facet(_) => Some(Cardinality::MultiValues),
199+
FieldType::Bytes(options) if options.is_fast() => Some(Cardinality::MultiValues),
200+
FieldType::Str(options) if options.is_fast() => Some(Cardinality::MultiValues),
201+
_ => None,
202+
}
203+
}
204+
141205
async fn warm_up_fastfields(
142206
searcher: &Searcher,
143207
fast_field_names: &HashSet<String>,
@@ -158,14 +222,42 @@ async fn warm_up_fastfields(
158222
if !field_entry.is_fast() {
159223
anyhow::bail!("Field {:?} is not a fast field.", fast_field_name);
160224
}
161-
fast_fields.push(fast_field);
225+
let cardinality =
226+
get_fastfield_cardinality(field_entry.field_type()).with_context(|| {
227+
format!(
228+
"Couldn't get field cardinality {:?} from type {:?}.",
229+
fast_field_name, field_entry
230+
)
231+
})?;
232+
233+
fast_fields.push((fast_field, cardinality));
162234
}
163235

164-
let mut warm_up_futures = Vec::new();
165-
for field in fast_fields {
236+
type SendebleFuture = dyn Future<Output = Result<OwnedBytes, AsyncIoError>> + Send;
237+
let mut warm_up_futures: Vec<Pin<Box<SendebleFuture>>> = Vec::new();
238+
for (field, cardinality) in fast_fields {
166239
for segment_reader in searcher.segment_readers() {
167-
let fast_field_slice = segment_reader.fast_fields().fast_field_data(field, 0)?;
168-
warm_up_futures.push(async move { fast_field_slice.read_bytes_async().await });
240+
match cardinality {
241+
Cardinality::SingleValue => {
242+
let fast_field_slice =
243+
segment_reader.fast_fields().fast_field_data(field, 0)?;
244+
warm_up_futures.push(Box::pin(async move {
245+
fast_field_slice.read_bytes_async().await
246+
}));
247+
}
248+
Cardinality::MultiValues => {
249+
let fast_field_slice =
250+
segment_reader.fast_fields().fast_field_data(field, 0)?;
251+
warm_up_futures.push(Box::pin(async move {
252+
fast_field_slice.read_bytes_async().await
253+
}));
254+
let fast_field_slice =
255+
segment_reader.fast_fields().fast_field_data(field, 1)?;
256+
warm_up_futures.push(Box::pin(async move {
257+
fast_field_slice.read_bytes_async().await
258+
}));
259+
}
260+
}
169261
}
170262
}
171263
try_join_all(warm_up_futures).await?;
@@ -218,7 +310,13 @@ async fn leaf_search_single_split(
218310
.reload_policy(ReloadPolicy::Manual)
219311
.try_into()?;
220312
let searcher = reader.searcher();
221-
warmup(&*searcher, &query, &quickwit_collector.fast_field_names()).await?;
313+
warmup(
314+
&*searcher,
315+
&query,
316+
&quickwit_collector.fast_field_names(),
317+
&quickwit_collector.term_dict_field_names(),
318+
)
319+
.await?;
222320
let leaf_search_response = crate::run_cpu_intensive(move || {
223321
let span = info_span!( "search", split_id = %split.split_id);
224322
let _span_guard = span.enter();

quickwit-search/src/search_stream/leaf.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ async fn leaf_search_stream_single_split(
159159
&*searcher,
160160
query.as_ref(),
161161
&request_fields.fast_fields_for_request(),
162+
&Default::default(),
162163
)
163164
.await?;
164165

0 commit comments

Comments
 (0)