Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,7 @@ impl DefaultDocMapperBuilder {
let field_entries = root_field_mapping.compute_field_entries();
let required_fast_fields: Vec<FieldPath> = field_entries
.into_iter()
.filter(|(_, field_type)| match field_type {
FieldType::U64(options)
| FieldType::I64(options)
| FieldType::F64(options)
| FieldType::Date(options) => options.is_fast(),
FieldType::Bytes(option) => option.is_fast(),
_ => false,
})
.filter(|(_, field_type)| field_type.is_fast())
.map(|(field_path, _)| field_path)
.collect();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,8 @@ pub struct QuickwitTextOptions {
pub fieldnorms: bool,
#[serde(default = "default_as_true")]
pub stored: bool,
#[serde(default)]
pub fast: bool,
}

impl From<QuickwitTextOptions> for TextOptions {
Expand All @@ -534,6 +536,9 @@ impl From<QuickwitTextOptions> for TextOptions {
if quickwit_text_options.stored {
text_options = text_options.set_stored();
}
if quickwit_text_options.fast {
text_options = text_options.set_fast();
}
if quickwit_text_options.indexed {
let mut text_field_indexing = TextFieldIndexing::default();
if let Some(tokenizer_name) = quickwit_text_options.tokenizer {
Expand Down
20 changes: 12 additions & 8 deletions quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::collections::{BinaryHeap, HashSet};
use itertools::Itertools;
use quickwit_doc_mapper::{DocMapper, SortBy, SortOrder};
use quickwit_proto::{LeafSearchResponse, PartialHit, SearchRequest};
use tantivy::aggregation::agg_req::{get_fast_field_names, Aggregations};
use tantivy::aggregation::agg_req::{
get_fast_field_names, get_term_dict_field_names, Aggregations,
};
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use tantivy::aggregation::AggregationSegmentCollector;
use tantivy::collector::{Collector, SegmentCollector};
Expand Down Expand Up @@ -228,11 +230,6 @@ impl SegmentCollector for QuickwitSegmentCollector {
}
}

// TODO: seems not very useful, remove it and refactor it.
pub trait GenericQuickwitCollector: Collector {
fn fast_field_names(&self) -> HashSet<String>;
}

/// The quickwit collector is the tantivy Collector used in Quickwit.
///
/// It defines the data that should be accumulated about the documents matching
Expand All @@ -250,14 +247,21 @@ pub struct QuickwitCollector {
pub aggregation: Option<Aggregations>,
}

impl GenericQuickwitCollector for QuickwitCollector {
fn fast_field_names(&self) -> HashSet<String> {
impl QuickwitCollector {
pub fn fast_field_names(&self) -> HashSet<String> {
let mut fast_field_names = self.fast_field_names.clone();
if let Some(aggregate) = self.aggregation.as_ref() {
fast_field_names.extend(get_fast_field_names(aggregate));
}
fast_field_names
}
pub fn term_dict_field_names(&self) -> HashSet<String> {
let mut term_dict_field_names = HashSet::default();
if let Some(aggregate) = self.aggregation.as_ref() {
term_dict_field_names.extend(get_term_dict_field_names(aggregate));
}
term_dict_field_names
}
}

impl Collector for QuickwitCollector {
Expand Down
116 changes: 107 additions & 9 deletions quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

use std::collections::{BTreeMap, HashSet};
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;

use anyhow::Context;
use futures::future::try_join_all;
use futures::Future;
use itertools::{Either, Itertools};
use once_cell::sync::OnceCell;
use quickwit_config::get_searcher_config_instance;
Expand All @@ -36,12 +38,14 @@ use quickwit_storage::{
};
use tantivy::collector::Collector;
use tantivy::directory::FileSlice;
use tantivy::error::AsyncIoError;
use tantivy::query::Query;
use tantivy::schema::{Cardinality, FieldType};
use tantivy::{Index, ReloadPolicy, Searcher, Term};
use tokio::task::spawn_blocking;
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, GenericQuickwitCollector};
use crate::collector::{make_collector_for_split, make_merge_collector};
use crate::SearchError;

fn global_split_footer_cache() -> &'static MemorySizedCache<String> {
Expand Down Expand Up @@ -121,23 +125,83 @@ pub(crate) async fn open_index(
///
/// The downloaded data depends on the query (which term's posting list is required,
/// are position required too), and the collector.
///
/// * `query` - query is used to extract the terms and the fields which will be loaded from the
/// inverted_index.
///
/// * `term_dict_field_names` - A list of fields, where the whole dictionary needs to be loaded.
/// This is e.g. required for term aggregation, since we don't know in advance which terms are going
/// to be hit.
#[instrument(skip(searcher, query, fast_field_names))]
pub(crate) async fn warmup(
searcher: &Searcher,
query: &dyn Query,
fast_field_names: &HashSet<String>,
term_dict_field_names: &HashSet<String>,
) -> anyhow::Result<()> {
let warm_up_terms_future =
warm_up_terms(searcher, query).instrument(debug_span!("warm_up_terms"));
let warm_up_term_dict_future = warm_up_term_dict_fields(searcher, term_dict_field_names)
.instrument(debug_span!("warm_up_term_dicts"));
let warm_up_fastfields_future = warm_up_fastfields(searcher, fast_field_names)
.instrument(debug_span!("warm_up_fastfields"));
let (warm_up_terms_res, warm_up_fastfields_res) =
tokio::join!(warm_up_terms_future, warm_up_fastfields_future);
let (warm_up_terms_res, warm_up_fastfields_res, warm_up_term_dict_res) = tokio::join!(
warm_up_terms_future,
warm_up_fastfields_future,
warm_up_term_dict_future
);
warm_up_terms_res?;
warm_up_fastfields_res?;
warm_up_term_dict_res?;
Ok(())
}

async fn warm_up_term_dict_fields(
searcher: &Searcher,
term_dict_field_names: &HashSet<String>,
) -> anyhow::Result<()> {
let mut term_dict_fields = Vec::new();
for term_dict_field_name in term_dict_field_names.iter() {
let term_dict_field = searcher
.schema()
.get_field(term_dict_field_name)
.with_context(|| {
format!(
"Couldn't get field named {:?} from schema.",
term_dict_field_name
)
})?;

term_dict_fields.push(term_dict_field);
}

let mut warm_up_futures = Vec::new();
for field in term_dict_fields {
for segment_reader in searcher.segment_readers() {
let inverted_index = segment_reader.inverted_index(field)?.clone();
warm_up_futures.push(async move {
let dict = inverted_index.terms();
dict.warm_up_dictionary().await
});
}
}
try_join_all(warm_up_futures).await?;
Ok(())
}

pub fn get_fastfield_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match field_type {
FieldType::U64(options) | FieldType::I64(options) | FieldType::F64(options) => {
options.get_fastfield_cardinality()
}
FieldType::Date(options) => options.get_fastfield_cardinality(),
FieldType::Facet(_) => Some(Cardinality::MultiValues),
FieldType::Bytes(options) if options.is_fast() => Some(Cardinality::MultiValues),
FieldType::Str(options) if options.is_fast() => Some(Cardinality::MultiValues),
_ => None,
}
}

async fn warm_up_fastfields(
searcher: &Searcher,
fast_field_names: &HashSet<String>,
Expand All @@ -158,14 +222,42 @@ async fn warm_up_fastfields(
if !field_entry.is_fast() {
anyhow::bail!("Field {:?} is not a fast field.", fast_field_name);
}
fast_fields.push(fast_field);
let cardinality =
get_fastfield_cardinality(field_entry.field_type()).with_context(|| {
format!(
"Couldn't get field cardinality {:?} from type {:?}.",
fast_field_name, field_entry
)
})?;

fast_fields.push((fast_field, cardinality));
}

let mut warm_up_futures = Vec::new();
for field in fast_fields {
type SendebleFuture = dyn Future<Output = Result<OwnedBytes, AsyncIoError>> + Send;
let mut warm_up_futures: Vec<Pin<Box<SendebleFuture>>> = Vec::new();
for (field, cardinality) in fast_fields {
for segment_reader in searcher.segment_readers() {
let fast_field_slice = segment_reader.fast_fields().fast_field_data(field, 0)?;
warm_up_futures.push(async move { fast_field_slice.read_bytes_async().await });
match cardinality {
Cardinality::SingleValue => {
let fast_field_slice =
segment_reader.fast_fields().fast_field_data(field, 0)?;
warm_up_futures.push(Box::pin(async move {
fast_field_slice.read_bytes_async().await
}));
}
Cardinality::MultiValues => {
let fast_field_slice =
segment_reader.fast_fields().fast_field_data(field, 0)?;
warm_up_futures.push(Box::pin(async move {
fast_field_slice.read_bytes_async().await
}));
let fast_field_slice =
segment_reader.fast_fields().fast_field_data(field, 1)?;
warm_up_futures.push(Box::pin(async move {
fast_field_slice.read_bytes_async().await
}));
}
}
}
}
try_join_all(warm_up_futures).await?;
Expand Down Expand Up @@ -218,7 +310,13 @@ async fn leaf_search_single_split(
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();
warmup(&*searcher, &query, &quickwit_collector.fast_field_names()).await?;
warmup(
&*searcher,
&query,
&quickwit_collector.fast_field_names(),
&quickwit_collector.term_dict_field_names(),
)
.await?;
let leaf_search_response = crate::run_cpu_intensive(move || {
let span = info_span!( "search", split_id = %split.split_id);
let _span_guard = span.enter();
Expand Down
1 change: 1 addition & 0 deletions quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn leaf_search_stream_single_split(
&*searcher,
query.as_ref(),
&request_fields.fast_fields_for_request(),
&Default::default(),
)
.await?;

Expand Down