From 1fc8ed23bbaa24262c63d230528d1ea939da9d0d Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 19 Apr 2022 12:51:51 +0800 Subject: [PATCH] handle dict preloading for agg handle cardinality for fields --- .../src/default_doc_mapper/default_mapper.rs | 9 +- .../default_doc_mapper/field_mapping_entry.rs | 5 + quickwit-search/src/collector.rs | 20 +-- quickwit-search/src/leaf.rs | 116 ++++++++++++++++-- quickwit-search/src/search_stream/leaf.rs | 1 + 5 files changed, 126 insertions(+), 25 deletions(-) diff --git a/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index 848d95b08dc..b3b9f210349 100644 --- a/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -147,14 +147,7 @@ impl DefaultDocMapperBuilder { let field_entries = root_field_mapping.compute_field_entries(); let required_fast_fields: Vec = field_entries .into_iter() - .filter(|(_, field_type)| match field_type { - FieldType::U64(options) - | FieldType::I64(options) - | FieldType::F64(options) - | FieldType::Date(options) => options.is_fast(), - FieldType::Bytes(option) => option.is_fast(), - _ => false, - }) + .filter(|(_, field_type)| field_type.is_fast()) .map(|(field_path, _)| field_path) .collect(); diff --git a/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs b/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs index 83dbc1dbbe9..37caaf4420a 100644 --- a/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs +++ b/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs @@ -526,6 +526,8 @@ pub struct QuickwitTextOptions { pub fieldnorms: bool, #[serde(default = "default_as_true")] pub stored: bool, + #[serde(default)] + pub fast: bool, } impl From for TextOptions { @@ -534,6 +536,9 @@ impl From for TextOptions { if quickwit_text_options.stored { text_options = text_options.set_stored(); } + if quickwit_text_options.fast { + text_options = text_options.set_fast(); + } if quickwit_text_options.indexed { let mut text_field_indexing = TextFieldIndexing::default(); if let Some(tokenizer_name) = quickwit_text_options.tokenizer { diff --git a/quickwit-search/src/collector.rs b/quickwit-search/src/collector.rs index 4d419c18137..7678a7663ad 100644 --- a/quickwit-search/src/collector.rs +++ b/quickwit-search/src/collector.rs @@ -23,7 +23,9 @@ use std::collections::{BinaryHeap, HashSet}; use itertools::Itertools; use quickwit_doc_mapper::{DocMapper, SortBy, SortOrder}; use quickwit_proto::{LeafSearchResponse, PartialHit, SearchRequest}; -use tantivy::aggregation::agg_req::{get_fast_field_names, Aggregations}; +use tantivy::aggregation::agg_req::{ + get_fast_field_names, get_term_dict_field_names, Aggregations, +}; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::aggregation::AggregationSegmentCollector; use tantivy::collector::{Collector, SegmentCollector}; @@ -228,11 +230,6 @@ impl SegmentCollector for QuickwitSegmentCollector { } } -// TODO: seems not very useful, remove it and refactor it. -pub trait GenericQuickwitCollector: Collector { - fn fast_field_names(&self) -> HashSet; -} - /// The quickwit collector is the tantivy Collector used in Quickwit. /// /// It defines the data that should be accumulated about the documents matching @@ -250,14 +247,21 @@ pub struct QuickwitCollector { pub aggregation: Option, } -impl GenericQuickwitCollector for QuickwitCollector { - fn fast_field_names(&self) -> HashSet { +impl QuickwitCollector { + pub fn fast_field_names(&self) -> HashSet { let mut fast_field_names = self.fast_field_names.clone(); if let Some(aggregate) = self.aggregation.as_ref() { fast_field_names.extend(get_fast_field_names(aggregate)); } fast_field_names } + pub fn term_dict_field_names(&self) -> HashSet { + let mut term_dict_field_names = HashSet::default(); + if let Some(aggregate) = self.aggregation.as_ref() { + term_dict_field_names.extend(get_term_dict_field_names(aggregate)); + } + term_dict_field_names + } } impl Collector for QuickwitCollector { diff --git a/quickwit-search/src/leaf.rs b/quickwit-search/src/leaf.rs index 35b10cd4ec1..8a92074ccc7 100644 --- a/quickwit-search/src/leaf.rs +++ b/quickwit-search/src/leaf.rs @@ -19,10 +19,12 @@ use std::collections::{BTreeMap, HashSet}; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; use anyhow::Context; use futures::future::try_join_all; +use futures::Future; use itertools::{Either, Itertools}; use once_cell::sync::OnceCell; use quickwit_config::get_searcher_config_instance; @@ -36,12 +38,14 @@ use quickwit_storage::{ }; use tantivy::collector::Collector; use tantivy::directory::FileSlice; +use tantivy::error::AsyncIoError; use tantivy::query::Query; +use tantivy::schema::{Cardinality, FieldType}; use tantivy::{Index, ReloadPolicy, Searcher, Term}; use tokio::task::spawn_blocking; use tracing::*; -use crate::collector::{make_collector_for_split, make_merge_collector, GenericQuickwitCollector}; +use crate::collector::{make_collector_for_split, make_merge_collector}; use crate::SearchError; fn global_split_footer_cache() -> &'static MemorySizedCache { @@ -121,23 +125,83 @@ pub(crate) async fn open_index( /// /// The downloaded data depends on the query (which term's posting list is required, /// are position required too), and the collector. +/// +/// * `query` - query is used to extract the terms and the fields which will be loaded from the +/// inverted_index. +/// +/// * `term_dict_field_names` - A list of fields, where the whole dictionary needs to be loaded. +/// This is e.g. required for term aggregation, since we don't know in advance which terms are going +/// to be hit. #[instrument(skip(searcher, query, fast_field_names))] pub(crate) async fn warmup( searcher: &Searcher, query: &dyn Query, fast_field_names: &HashSet, + term_dict_field_names: &HashSet, ) -> anyhow::Result<()> { let warm_up_terms_future = warm_up_terms(searcher, query).instrument(debug_span!("warm_up_terms")); + let warm_up_term_dict_future = warm_up_term_dict_fields(searcher, term_dict_field_names) + .instrument(debug_span!("warm_up_term_dicts")); let warm_up_fastfields_future = warm_up_fastfields(searcher, fast_field_names) .instrument(debug_span!("warm_up_fastfields")); - let (warm_up_terms_res, warm_up_fastfields_res) = - tokio::join!(warm_up_terms_future, warm_up_fastfields_future); + let (warm_up_terms_res, warm_up_fastfields_res, warm_up_term_dict_res) = tokio::join!( + warm_up_terms_future, + warm_up_fastfields_future, + warm_up_term_dict_future + ); warm_up_terms_res?; warm_up_fastfields_res?; + warm_up_term_dict_res?; + Ok(()) +} + +async fn warm_up_term_dict_fields( + searcher: &Searcher, + term_dict_field_names: &HashSet, +) -> anyhow::Result<()> { + let mut term_dict_fields = Vec::new(); + for term_dict_field_name in term_dict_field_names.iter() { + let term_dict_field = searcher + .schema() + .get_field(term_dict_field_name) + .with_context(|| { + format!( + "Couldn't get field named {:?} from schema.", + term_dict_field_name + ) + })?; + + term_dict_fields.push(term_dict_field); + } + + let mut warm_up_futures = Vec::new(); + for field in term_dict_fields { + for segment_reader in searcher.segment_readers() { + let inverted_index = segment_reader.inverted_index(field)?.clone(); + warm_up_futures.push(async move { + let dict = inverted_index.terms(); + dict.warm_up_dictionary().await + }); + } + } + try_join_all(warm_up_futures).await?; Ok(()) } +pub fn get_fastfield_cardinality(field_type: &FieldType) -> Option { + match field_type { + FieldType::U64(options) | FieldType::I64(options) | FieldType::F64(options) => { + options.get_fastfield_cardinality() + } + FieldType::Date(options) => options.get_fastfield_cardinality(), + FieldType::Facet(_) => Some(Cardinality::MultiValues), + FieldType::Bytes(options) if options.is_fast() => Some(Cardinality::MultiValues), + FieldType::Str(options) if options.is_fast() => Some(Cardinality::MultiValues), + _ => None, + } +} + async fn warm_up_fastfields( searcher: &Searcher, fast_field_names: &HashSet, @@ -158,14 +222,42 @@ async fn warm_up_fastfields( if !field_entry.is_fast() { anyhow::bail!("Field {:?} is not a fast field.", fast_field_name); } - fast_fields.push(fast_field); + let cardinality = + get_fastfield_cardinality(field_entry.field_type()).with_context(|| { + format!( + "Couldn't get field cardinality {:?} from type {:?}.", + fast_field_name, field_entry + ) + })?; + + fast_fields.push((fast_field, cardinality)); } - let mut warm_up_futures = Vec::new(); - for field in fast_fields { + type SendebleFuture = dyn Future> + Send; + let mut warm_up_futures: Vec>> = Vec::new(); + for (field, cardinality) in fast_fields { for segment_reader in searcher.segment_readers() { - let fast_field_slice = segment_reader.fast_fields().fast_field_data(field, 0)?; - warm_up_futures.push(async move { fast_field_slice.read_bytes_async().await }); + match cardinality { + Cardinality::SingleValue => { + let fast_field_slice = + segment_reader.fast_fields().fast_field_data(field, 0)?; + warm_up_futures.push(Box::pin(async move { + fast_field_slice.read_bytes_async().await + })); + } + Cardinality::MultiValues => { + let fast_field_slice = + segment_reader.fast_fields().fast_field_data(field, 0)?; + warm_up_futures.push(Box::pin(async move { + fast_field_slice.read_bytes_async().await + })); + let fast_field_slice = + segment_reader.fast_fields().fast_field_data(field, 1)?; + warm_up_futures.push(Box::pin(async move { + fast_field_slice.read_bytes_async().await + })); + } + } } } try_join_all(warm_up_futures).await?; @@ -218,7 +310,13 @@ async fn leaf_search_single_split( .reload_policy(ReloadPolicy::Manual) .try_into()?; let searcher = reader.searcher(); - warmup(&*searcher, &query, &quickwit_collector.fast_field_names()).await?; + warmup( + &*searcher, + &query, + &quickwit_collector.fast_field_names(), + &quickwit_collector.term_dict_field_names(), + ) + .await?; let leaf_search_response = crate::run_cpu_intensive(move || { let span = info_span!( "search", split_id = %split.split_id); let _span_guard = span.enter(); diff --git a/quickwit-search/src/search_stream/leaf.rs b/quickwit-search/src/search_stream/leaf.rs index 7d808f2ff65..304ac1b819d 100644 --- a/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit-search/src/search_stream/leaf.rs @@ -159,6 +159,7 @@ async fn leaf_search_stream_single_split( &*searcher, query.as_ref(), &request_fields.fast_fields_for_request(), + &Default::default(), ) .await?;