diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index 50da9d5851c..31097e2e597 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -12,8 +12,9 @@ In addition to the `index_id`, the index configuration lets you define five item - The **doc mapping**: it defines how a document and the fields it contains are stored and indexed for a given index. - The **indexing settings**: it defines the timestamp field used for sharding, and some more advanced parameters like the merge policy. - The **search settings**: it defines the default search fields `default_search_fields`, a list of fields that Quickwit will search into if the user query does not explicitly target a field. +- The **retention policy**: it defines how long Quickwit should keep the indexed data. If not specified, the data is stored forever. -Configuration is set at index creation and cannot be modified with the current version of Quickwit. +Configuration is generally set at index creation and cannot be modified, except for some specific attributes like the search settings and retention policy, which can be changed using the [update endpoint](../reference/rest-api.md) or the [CLI](../reference/cli.md). ## Config file format diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 6b289fb7024..651e162013d 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -116,7 +116,7 @@ quickwit run | Option | Description | Default | |-----------------|-------------|--------:| | `--config` | Config file location | `config/quickwit.yaml` | -| `--service` | Services (indexer,searcher,janitor,metastore or control-plane) to run. If unspecified, all the supported services are started. | | +| `--service` | Services (`indexer`, `searcher`, `metastore`, `control-plane`, or `janitor`) to run. If unspecified, all the supported services are started. | | *Examples* @@ -141,7 +141,7 @@ curl "http://127.0.0.1:7280/api/v1/wikipedia/search?query=barack+obama" ``` ## index -Manages indexes: creates, deletes, ingests, searches, describes... +Manages indexes: creates, updates, deletes, ingests, searches, describes... ### index create @@ -180,6 +180,51 @@ quickwit index create --endpoint=http://127.0.0.1:7280 --index-config wikipedia_ ``` +### index update + +`quickwit index update [args]` +#### index update search-settings + +Updates default search settings. +`quickwit index update search-settings [args]` + +*Synopsis* + +```bash +quickwit index update search-settings + --index + --default-search-fields +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--index` | ID of the target index | +| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. "field1 field2". If no value is provided, existing defaults are removed and queries without target field will fail. | +#### index update retention-policy + +Configure or disable the retention policy. +`quickwit index update retention-policy [args]` + +*Synopsis* + +```bash +quickwit index update retention-policy + --index + [--period ] + [--schedule ] + [--disable] +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--index` | ID of the target index | +| `--period` | Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...) | +| `--schedule` | Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...). | +| `--disable` | Disable the retention policy. Old indexed data will not be cleaned up anymore. | ### index clear Clears an index: deletes all splits and resets checkpoint. @@ -661,8 +706,8 @@ quickwit split list | Option | Description | |-----------------|-------------| | `--index` | Target index ID | -| `--offset` | Number of splits to skip | -| `--limit` | Maximum number of splits to retrieve | +| `--offset` | Number of splits to skip. | +| `--limit` | Maximum number of splits to retrieve. | | `--states` | Selects the splits whose states are included in this comma-separated list of states. Possible values are `staged`, `published`, and `marked`. | | `--create-date` | Selects the splits whose creation dates are before this date. | | `--start-date` | Selects the splits that contain documents after this date (time-series indexes only). | diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index b58cdafe66d..0408851d73d 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -223,13 +223,13 @@ The response is a JSON object, and the content type is `application/json; charse POST api/v1/indexes ``` -Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json`) and YAML `content-type: application/yaml`. +Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json` and YAML `content-type: application/yaml`. #### POST payload -| Variable | Type | Description | Default value | -|-----------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| -| `version` | `String` | Config format version, use the same as your Quickwit version. (mandatory) | | +| Variable | Type | Description | Default value | +|---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| `version` | `String` | Config format version, use the same as your Quickwit version. (mandatory) | | | `index_id` | `String` | Index ID, see its [validation rules](../configuration/index-config.md#index-id) on identifiers. (mandatory) | | | `index_uri` | `String` | Defines where the index files are stored. This parameter expects a [storage URI](../configuration/storage-config.md#storage-uris). | `{default_index_root_uri}/{index_id}` | | `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping) (mandatory) | | @@ -301,8 +301,62 @@ curl -XPOST http://0.0.0.0:8080/api/v1/indexes --data @index_config.json -H "Con The response is the index metadata of the created index, and the content type is `application/json; charset=UTF-8.` -| Field | Description | Type | -|----------------------|-------------------------------------------|:---------------------:| +| Field | Description | Type | +|----------------------|-----------------------------------------|:---------------------:| +| `index_config` | The posted index config. | `IndexConfig` | +| `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | +| `create_timestamp` | Index creation timestamp | `number` | +| `sources` | List of the index sources configurations. | `Array` | + + +### Update an index (search settings and retention policy only) + +``` +PUT api/v1/indexes/ +``` + +Updates the search settings and retention policy of an index. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. + +#### PUT payload + +| Variable | Type | Description | Default value | +|---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | +| `retention` | `Retention` | Retention policy object as specified in the [index config docs](../configuration/index-config.md#retention-policy). | | + + +**Payload Example** + +curl -XPUT http://0.0.0.0:8080/api/v1/indexes --data @index_update.json -H "Content-Type: application/json" + +```json title="index_update.json +{ + "search_settings": { + "default_search_fields": ["body"] + }, + "retention": { + "period": "3 days", + "schedule": "@daily" + } +} +``` + +:::warning +Calling the update endpoint with the following payload will remove the current retention policy. +```json +{ + "search_settings": { + "default_search_fields": ["body"] + } +} +``` + +#### Response + +The response is the index metadata of the updated index, and the content type is `application/json; charset=UTF-8.` + +| Field | Description | Type | +|----------------------|-----------------------------------------|:---------------------:| | `index_config` | The posted index config. | `IndexConfig` | | `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | | `create_timestamp` | Index creation timestamp | `number` | diff --git a/quickwit/quickwit-cli/src/generate_markdown.rs b/quickwit/quickwit-cli/src/generate_markdown.rs index fe22a753ed1..03ea5bf5cbb 100644 --- a/quickwit/quickwit-cli/src/generate_markdown.rs +++ b/quickwit/quickwit-cli/src/generate_markdown.rs @@ -43,11 +43,13 @@ fn markdown_for_subcommand( subcommand: &Command, command_group: Vec, doc_extensions: &toml::Value, + level: usize, ) { let subcommand_name = subcommand.get_name(); let command_name = format!("{} {}", command_group.join(" "), subcommand_name); - println!("### {command_name}\n"); + let header_level = "#".repeat(level); + println!("{header_level} {command_name}\n"); let subcommand_ext: Option<&Value> = { let mut val_opt: Option<&Value> = doc_extensions.get(command_group[0].to_string()); @@ -195,20 +197,20 @@ fn generate_markdown_from_clap(command: &Command) { continue; } - let excluded_doc_commands = ["merge"]; + let excluded_doc_commands = ["merge", "local-search"]; for subcommand in command .get_subcommands() .filter(|subcommand| !excluded_doc_commands.contains(&subcommand.get_name())) { let commands = vec![command.get_name().to_string()]; - markdown_for_subcommand(subcommand, commands, &doc_extensions); + markdown_for_subcommand(subcommand, commands, &doc_extensions, 3); for subsubcommand in subcommand.get_subcommands() { let commands = vec![ command.get_name().to_string(), subcommand.get_name().to_string(), ]; - markdown_for_subcommand(subsubcommand, commands, &doc_extensions); + markdown_for_subcommand(subsubcommand, commands, &doc_extensions, 4); } } } diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index/mod.rs similarity index 98% rename from quickwit/quickwit-cli/src/index.rs rename to quickwit/quickwit-cli/src/index/mod.rs index 6a5fda6bb9d..5102119bbe3 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index/mod.rs @@ -54,13 +54,16 @@ use tabled::{Table, Tabled}; use thousands::Separable; use tracing::{debug, Level}; +use self::update::{build_index_update_command, IndexUpdateCliCommand}; use crate::checklist::GREEN_COLOR; use crate::stats::{mean, percentile, std_deviation}; use crate::{client_args, make_table, prompt_confirmation, ClientArgs, THROUGHPUT_WINDOW_SIZE}; +pub mod update; + pub fn build_index_command() -> Command { Command::new("index") - .about("Manages indexes: creates, deletes, ingests, searches, describes...") + .about("Manages indexes: creates, updates, deletes, ingests, searches, describes...") .args(client_args()) .subcommand( Command::new("create") @@ -74,9 +77,12 @@ pub fn build_index_command() -> Command { .required(false), ]) ) + .subcommand( + build_index_update_command().display_order(2) + ) .subcommand( Command::new("clear") - .display_order(2) + .display_order(3) .alias("clr") .about("Clears an index: deletes all splits and resets checkpoint.") .long_about("Deletes all its splits and resets its checkpoint. This operation is destructive and cannot be undone, proceed with caution.") @@ -88,7 +94,7 @@ pub fn build_index_command() -> Command { ) .subcommand( Command::new("delete") - .display_order(3) + .display_order(4) .alias("del") .about("Deletes an index.") .long_about("Deletes an index. This operation is destructive and cannot be undone, proceed with caution.") @@ -102,7 +108,7 @@ pub fn build_index_command() -> Command { ) .subcommand( Command::new("describe") - .display_order(4) + .display_order(5) .about("Displays descriptive statistics of an index.") .long_about("Displays descriptive statistics of an index. Displayed statistics are: number of published splits, number of documents, splits min/max timestamps, size of splits.") .args(&[ @@ -113,12 +119,12 @@ pub fn build_index_command() -> Command { .subcommand( Command::new("list") .alias("ls") - .display_order(5) + .display_order(6) .about("List indexes.") ) .subcommand( Command::new("ingest") - .display_order(6) + .display_order(7) .about("Ingest NDJSON documents with the ingest API.") .long_about("Reads NDJSON documents from a file or streamed from stdin and sends them into ingest API.") .args(&[ @@ -148,14 +154,14 @@ pub fn build_index_command() -> Command { .conflicts_with("wait"), Arg::new("commit-timeout") .long("commit-timeout") - .help("Duration of the commit timeout operation.") + .help("Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before commiting splits after their creation.") .required(false) .global(true), ]) ) .subcommand( Command::new("search") - .display_order(7) + .display_order(8) .about("Searches an index.") .args(&[ arg!(--index "ID of the target index") @@ -256,6 +262,7 @@ pub enum IndexCliCommand { Ingest(IngestDocsArgs), List(ListIndexesArgs), Search(SearchIndexArgs), + Update(IndexUpdateCliCommand), } impl IndexCliCommand { @@ -278,6 +285,7 @@ impl IndexCliCommand { "ingest" => Self::parse_ingest_args(submatches), "list" => Self::parse_list_args(submatches), "search" => Self::parse_search_args(submatches), + "update" => Ok(Self::Update(IndexUpdateCliCommand::parse_args(submatches)?)), _ => bail!("unknown index subcommand `{subcommand}`"), } } @@ -438,6 +446,7 @@ impl IndexCliCommand { Self::Ingest(args) => ingest_docs_cli(args).await, Self::List(args) => list_index_cli(args).await, Self::Search(args) => search_index_cli(args).await, + Self::Update(args) => args.execute().await, } } } diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs new file mode 100644 index 00000000000..c79f30357ea --- /dev/null +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -0,0 +1,256 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use anyhow::{bail, Context}; +use clap::{arg, ArgMatches, Command}; +use colored::Colorize; +use quickwit_config::{RetentionPolicy, SearchSettings}; +use quickwit_serve::IndexUpdates; +use tracing::debug; + +use crate::checklist::GREEN_COLOR; +use crate::ClientArgs; + +pub fn build_index_update_command() -> Command { + Command::new("update") + .subcommand_required(true) + .subcommand( + Command::new("search-settings") + .about("Updates default search settings.") + .args(&[ + arg!(--index "ID of the target index") + .display_order(1) + .required(true), + arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. \"field1 field2\". If no value is provided, existing defaults are removed and queries without target field will fail.") + .display_order(2) + .num_args(0..) + .required(true), + ])) + .subcommand( + Command::new("retention-policy") + .about("Configures or disables the retention policy.") + .args(&[ + arg!(--index "ID of the target index") + .display_order(1) + .required(true), + arg!(--"period" "Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...)") + .display_order(2) + .required(false), + arg!(--"schedule" "Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...).") + .display_order(3) + .required(false), + arg!(--"disable" "Disables the retention policy. Old indexed data will not be cleaned up anymore.") + .display_order(4) + .required(false), + ]) + ) +} + +#[derive(Debug, Eq, PartialEq)] +pub struct RetentionPolicyArgs { + pub client_args: ClientArgs, + pub index_id: String, + pub disable: bool, + pub period: Option, + pub schedule: Option, +} + +#[derive(Debug, Eq, PartialEq)] +pub struct SearchSettingsArgs { + pub client_args: ClientArgs, + pub index_id: String, + pub default_search_fields: Vec, +} + +#[derive(Debug, Eq, PartialEq)] +pub enum IndexUpdateCliCommand { + RetentionPolicy(RetentionPolicyArgs), + SearchSettings(SearchSettingsArgs), +} + +impl IndexUpdateCliCommand { + pub fn parse_args(mut matches: ArgMatches) -> anyhow::Result { + let (subcommand, submatches) = matches + .remove_subcommand() + .context("failed to parse index update subcommand")?; + match subcommand.as_str() { + "retention-policy" => Self::parse_update_retention_policy_args(submatches), + "search-settings" => Self::parse_update_search_settings_args(submatches), + _ => bail!("unknown index update subcommand `{subcommand}`"), + } + } + + fn parse_update_retention_policy_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let disable = matches.get_flag("disable"); + let period = matches.remove_one::("period"); + let schedule = matches.remove_one::("schedule"); + Ok(Self::RetentionPolicy(RetentionPolicyArgs { + client_args, + index_id, + disable, + period, + schedule, + })) + } + + fn parse_update_search_settings_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let default_search_fields = matches + .remove_many::("default-search-fields") + .map(|values| values.collect()) + // --default-search-fields should be made optional if other fields + // are added to SearchSettings + .expect("`default-search-fields` should be a required arg."); + Ok(Self::SearchSettings(SearchSettingsArgs { + client_args, + index_id, + default_search_fields, + })) + } + + pub async fn execute(self) -> anyhow::Result<()> { + match self { + Self::RetentionPolicy(args) => update_retention_policy_cli(args).await, + Self::SearchSettings(args) => update_search_settings_cli(args).await, + } + } +} + +pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-retention-policy"); + println!("❯ Updating index retention policy..."); + let qw_client = args.client_args.client(); + let metadata = qw_client.indexes().get(&args.index_id).await?; + let new_retention_policy_opt = match ( + args.disable, + args.period, + args.schedule, + metadata.index_config.retention_policy_opt, + ) { + (true, Some(_), Some(_), _) | (true, None, Some(_), _) | (true, Some(_), None, _) => { + bail!("`--period` and `--schedule` cannot be used together with `--disable`") + } + (false, None, None, _) => bail!("either `--period` or `--disable` must be specified"), + (false, None, Some(_), None) => { + bail!("`--period` is required when creating a retention policy") + } + (true, None, None, _) => None, + (false, None, Some(schedule), Some(policy)) => Some(RetentionPolicy { + retention_period: policy.retention_period, + evaluation_schedule: schedule, + }), + (false, Some(period), schedule_opt, None) => Some(RetentionPolicy { + retention_period: period, + evaluation_schedule: schedule_opt.unwrap_or(RetentionPolicy::default_schedule()), + }), + (false, Some(period), schedule_opt, Some(policy)) => Some(RetentionPolicy { + retention_period: period, + evaluation_schedule: schedule_opt.unwrap_or(policy.evaluation_schedule.clone()), + }), + }; + if let Some(new_retention_policy) = new_retention_policy_opt.as_ref() { + println!( + "New retention policy: {}", + serde_json::to_string(&new_retention_policy)? + ); + } else { + println!("Disable retention policy."); + } + qw_client + .indexes() + .update( + &args.index_id, + IndexUpdates { + retention_policy_opt: new_retention_policy_opt, + search_settings: metadata.index_config.search_settings, + }, + ) + .await?; + println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); + Ok(()) +} + +pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-search-settings"); + println!("❯ Updating index search settings..."); + let qw_client = args.client_args.client(); + let metadata = qw_client.indexes().get(&args.index_id).await?; + let search_settings = SearchSettings { + default_search_fields: args.default_search_fields, + }; + println!( + "New search settings: {}", + serde_json::to_string(&search_settings)? + ); + qw_client + .indexes() + .update( + &args.index_id, + IndexUpdates { + retention_policy_opt: metadata.index_config.retention_policy_opt, + search_settings, + }, + ) + .await?; + println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::cli::{build_cli, CliCommand}; + use crate::index::IndexCliCommand; + + #[test] + fn test_cmd_update_subsubcommand() { + let app = build_cli().no_binary_name(true); + let matches = app + .try_get_matches_from([ + "index", + "update", + "retention-policy", + "--index", + "my-index", + "--period", + "1 day", + ]) + .unwrap(); + let command = CliCommand::parse_cli_args(matches).unwrap(); + assert!(matches!( + command, + CliCommand::Index(IndexCliCommand::Update( + IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { + client_args: _, + index_id, + disable: false, + period: Some(period), + schedule: None, + }) + )) if &index_id == "my-index" && &period == "1 day" + )); + } +} diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index fa8690dd140..848934c6a28 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -29,6 +29,7 @@ use clap::error::ErrorKind; use helpers::{TestEnv, TestStorageType}; use quickwit_cli::checklist::ChecklistError; use quickwit_cli::cli::build_cli; +use quickwit_cli::index::update::{update_retention_policy_cli, RetentionPolicyArgs}; use quickwit_cli::index::{ create_index_cli, delete_index_cli, search_index, CreateIndexArgs, DeleteIndexArgs, SearchIndexArgs, @@ -40,7 +41,7 @@ use quickwit_cli::ClientArgs; use quickwit_common::fs::get_cache_directory_path; use quickwit_common::rand::append_random_suffix; use quickwit_common::uri::Uri; -use quickwit_config::{SourceInputFormat, CLI_SOURCE_ID}; +use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID}; use quickwit_metastore::{ ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, StageSplitsRequestExt, @@ -533,6 +534,69 @@ async fn test_search_index_cli() { assert_eq!(search_res.num_hits, 0); } +#[tokio::test] +async fn test_cmd_update_index() { + quickwit_common::setup_logging_for_tests(); + let index_id = append_random_suffix("test-update-cmd"); + let test_env = create_test_env(index_id.clone(), TestStorageType::LocalFileSystem) + .await + .unwrap(); + test_env.start_server().await.unwrap(); + create_logs_index(&test_env).await.unwrap(); + + // add a policy + update_retention_policy_cli(RetentionPolicyArgs { + index_id: index_id.clone(), + client_args: ClientArgs { + cluster_endpoint: test_env.cluster_endpoint.clone(), + ..Default::default() + }, + disable: false, + period: Some(String::from("1 week")), + schedule: Some(String::from("daily")), + }) + .await + .unwrap(); + let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!( + index_metadata.index_config.retention_policy_opt, + Some(RetentionPolicy { + retention_period: String::from("1 week"), + evaluation_schedule: String::from("daily") + }) + ); + + // invalid args + update_retention_policy_cli(RetentionPolicyArgs { + index_id: index_id.clone(), + client_args: ClientArgs { + cluster_endpoint: test_env.cluster_endpoint.clone(), + ..Default::default() + }, + disable: true, + period: Some(String::from("a week")), + schedule: Some(String::from("daily")), + }) + .await + .unwrap_err(); + + // remove the policy + update_retention_policy_cli(RetentionPolicyArgs { + index_id, + client_args: ClientArgs { + cluster_endpoint: test_env.cluster_endpoint.clone(), + ..Default::default() + }, + disable: true, + period: None, + schedule: None, + }) + .await + .unwrap(); + let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!(index_metadata.index_config.retention_policy_opt, None); +} + #[tokio::test] async fn test_delete_index_cli_dry_run() { quickwit_common::setup_logging_for_tests(); diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index e0807820009..2aa295e49b6 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -239,7 +239,7 @@ pub struct RetentionPolicy { } impl RetentionPolicy { - fn default_schedule() -> String { + pub fn default_schedule() -> String { "hourly".to_string() } diff --git a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs new file mode 100644 index 00000000000..5ddc8a034ec --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs @@ -0,0 +1,146 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashSet; +use std::time::Duration; + +use quickwit_config::service::QuickwitService; +use quickwit_config::SearchSettings; +use quickwit_rest_client::rest_client::CommitType; +use quickwit_serve::{IndexUpdates, SearchRequestQueryString}; +use serde_json::json; + +use crate::ingest_json; +use crate::test_utils::{ingest_with_retry, ClusterSandbox}; + +#[tokio::test] +async fn test_update_on_multi_nodes_cluster() { + quickwit_common::setup_logging_for_tests(); + let nodes_services = vec![ + HashSet::from_iter([QuickwitService::Searcher]), + HashSet::from_iter([QuickwitService::Metastore]), + HashSet::from_iter([QuickwitService::Indexer]), + HashSet::from_iter([QuickwitService::ControlPlane]), + HashSet::from_iter([QuickwitService::Janitor]), + ]; + let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + .await + .unwrap(); + sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); + + { + // Wait for indexer to fully start. + // The starting time is a bit long for a cluster. + tokio::time::sleep(Duration::from_secs(3)).await; + let indexing_service_counters = sandbox + .indexer_rest_client + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(indexing_service_counters.num_running_pipelines, 0); + } + + // Create index + sandbox + .indexer_rest_client + .indexes() + .create( + r#" + version: 0.8 + index_id: my-updatable-index + doc_mapping: + field_mappings: + - name: title + type: text + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + search_settings: + default_search_fields: [title] + "#, + quickwit_config::ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + assert!(sandbox + .indexer_rest_client + .node_health() + .is_live() + .await + .unwrap()); + + // Wait until indexing pipelines are started. + sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + + // Check that ingest request send to searcher is forwarded to indexer and thus indexed. + ingest_with_retry( + &sandbox.searcher_rest_client, + "my-updatable-index", + ingest_json!({"title": "first", "body": "first record"}), + CommitType::Auto, + ) + .await + .unwrap(); + // Wait until split is commited and search. + tokio::time::sleep(Duration::from_secs(4)).await; + // No hit because default_search_fields covers "title" only + let search_response_no_hit = sandbox + .searcher_rest_client + .search( + "my-updatable-index", + SearchRequestQueryString { + query: "record".to_string(), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(search_response_no_hit.num_hits, 0); + // Update index to also search "body" by default, search should now have 1 hit + sandbox + .searcher_rest_client + .indexes() + .update( + "my-updatable-index", + IndexUpdates { + search_settings: SearchSettings { + default_search_fields: vec!["title".to_string(), "body".to_string()], + }, + retention_policy_opt: None, + }, + ) + .await + .unwrap(); + let search_response_no_hit = sandbox + .searcher_rest_client + .search( + "my-updatable-index", + SearchRequestQueryString { + query: "record".to_string(), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(search_response_no_hit.num_hits, 1); + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-integration-tests/src/tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/mod.rs index b1546b8d92a..76322012b30 100644 --- a/quickwit/quickwit-integration-tests/src/tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/mod.rs @@ -19,3 +19,4 @@ mod basic_tests; mod index_tests; +mod index_update_tests; diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index 92bb05e1a2a..db01e9c48b8 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -52,6 +52,7 @@ pub use metastore::{ IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt, + UpdateIndexRequestExt, }; pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; pub use metastore_resolver::MetastoreResolver; diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 4662b8cb6c9..5e4928794c2 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -35,7 +35,8 @@ use quickwit_proto::metastore::{ ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, - ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, }; /// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can @@ -116,6 +117,14 @@ impl MetastoreService for ControlPlaneMetastore { // Other metastore API calls. + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> MetastoreResult { + let response = self.metastore.update_index(request).await?; + Ok(response) + } + async fn index_metadata( &mut self, request: IndexMetadataRequest, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 17b39f046da..5f7f91c81e0 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,7 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_config::{SourceConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, @@ -213,6 +213,20 @@ impl FileBackedIndex { &self.metadata } + /// Replaces the search settings in the index config, returning whether a mutation occurred. + pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { + let is_mutation = self.metadata.index_config.search_settings != search_settings; + self.metadata.index_config.search_settings = search_settings; + is_mutation + } + + /// Replaces the retention policy in the index config, returning whether a mutation occurred. + pub fn set_retention_policy(&mut self, retention_policy_opt: Option) -> bool { + let is_mutation = self.metadata.index_config.retention_policy_opt != retention_policy_opt; + self.metadata.index_config.retention_policy_opt = retention_policy_opt; + is_mutation + } + /// Stages a single split. /// /// If a split already exists and is in the [SplitState::Staged] state, @@ -492,7 +506,7 @@ impl FileBackedIndex { } /// Deletes the source. Returns whether a mutation occurred. - pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult { + pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<()> { self.metadata.delete_source(source_id) } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 3843a580d81..a0ec0f51264 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -55,7 +55,7 @@ use quickwit_proto::metastore::{ ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, - StageSplitsRequest, ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid}; @@ -73,7 +73,8 @@ use self::store_operations::{delete_index, index_exists, load_index, put_index}; use super::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, - PublishSplitsRequestExt, StageSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, + PublishSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, + STREAM_SPLITS_CHUNK_SIZE, }; use crate::checkpoint::IndexCheckpointDelta; use crate::{IndexMetadata, ListSplitsQuery, MetastoreServiceExt, Split, SplitState}; @@ -456,6 +457,28 @@ impl MetastoreService for FileBackedMetastore { Ok(response) } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> MetastoreResult { + let search_settings = request.deserialize_search_settings()?; + let retention_policy_opt = request.deserialize_retention_policy()?; + let index_uid = request.index_uid(); + + let metadata = self + .mutate(index_uid, |index| { + let search_settings_mutated = index.set_search_settings(search_settings); + let retention_policy_mutated = index.set_retention_policy(retention_policy_opt); + if search_settings_mutated || retention_policy_mutated { + Ok(MutationOccurred::Yes(index.metadata().clone())) + } else { + Ok(MutationOccurred::No(index.metadata().clone())) + } + }) + .await?; + IndexMetadataResponse::try_from_index_metadata(&metadata) + } + async fn delete_index( &mut self, request: DeleteIndexRequest, @@ -640,9 +663,8 @@ impl MetastoreService for FileBackedMetastore { let index_uid = request.index_uid(); self.mutate(index_uid, |index| { - index - .delete_source(&request.source_id) - .map(MutationOccurred::from) + index.delete_source(&request.source_id)?; + Ok(MutationOccurred::Yes(())) }) .await?; Ok(EmptyResponse {}) diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index b07557a0748..501e487fa95 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -126,8 +126,8 @@ impl IndexMetadata { Ok(mutation_occurred) } - /// Deletes a source from the index. Returns whether the index was modified (true). - pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult { + /// Deletes a source from the index. + pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<()> { self.sources.remove(source_id).ok_or_else(|| { MetastoreError::NotFound(EntityKind::Source { index_id: self.index_id().to_string(), @@ -135,7 +135,7 @@ impl IndexMetadata { }) })?; self.checkpoint.remove_source(source_id); - Ok(true) + Ok(()) } } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index d00a3e14c1a..3a231f0029c 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,13 +31,13 @@ use bytes::Bytes; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; -use quickwit_config::{IndexConfig, SourceConfig}; +use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, IndexMetadataRequest, IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsRequest, ListSplitsResponse, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, - MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, + MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{IndexUid, SplitId}; use time::OffsetDateTime; @@ -179,6 +179,56 @@ impl CreateIndexResponseExt for CreateIndexResponse { } } +/// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. +pub trait UpdateIndexRequestExt { + /// Creates a new [`UpdateIndexRequest`] from the different updated fields. + fn try_from_updates( + index_uid: impl Into, + search_settings: &SearchSettings, + retention_policy_opt: &Option, + ) -> MetastoreResult; + + /// Deserializes the `search_settings_json` field of an [`UpdateIndexRequest`] into a + /// [`SearchSettings`] object. + fn deserialize_search_settings(&self) -> MetastoreResult; + + /// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a + /// [`RetentionPolicy`] object. + fn deserialize_retention_policy(&self) -> MetastoreResult>; +} + +impl UpdateIndexRequestExt for UpdateIndexRequest { + fn try_from_updates( + index_uid: impl Into, + search_settings: &SearchSettings, + retention_policy_opt: &Option, + ) -> MetastoreResult { + let search_settings_json = serde_utils::to_json_str(&search_settings)?; + let retention_policy_json = retention_policy_opt + .as_ref() + .map(serde_utils::to_json_str) + .transpose()?; + + let update_request = UpdateIndexRequest { + index_uid: Some(index_uid.into()), + search_settings_json, + retention_policy_json, + }; + Ok(update_request) + } + + fn deserialize_search_settings(&self) -> MetastoreResult { + serde_utils::from_json_str(&self.search_settings_json) + } + + fn deserialize_retention_policy(&self) -> MetastoreResult> { + self.retention_policy_json + .as_ref() + .map(|policy| serde_utils::from_json_str(policy)) + .transpose() + } +} + /// Helper trait to build a [`IndexMetadataResponse`] and deserialize its payload. pub trait IndexMetadataResponseExt { /// Creates a new [`IndexMetadataResponse`] from an [`IndexMetadata`]. diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 519e11b11db..9202c24cbea 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -42,7 +42,7 @@ use quickwit_proto::metastore::{ ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, + ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId}; @@ -60,12 +60,13 @@ use super::utils::{append_query_filters, establish_connection}; use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, }; +use crate::file_backed::MutationOccurred; use crate::metastore::postgres::utils::split_maturity_timestamp; use crate::metastore::{PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE}; use crate::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, - MetastoreServiceExt, Split, SplitState, StageSplitsRequestExt, + MetastoreServiceExt, Split, SplitState, StageSplitsRequestExt, UpdateIndexRequestExt, }; /// PostgreSQL metastore implementation. @@ -288,13 +289,14 @@ macro_rules! run_with_tx { }}; } -async fn mutate_index_metadata Result>( +async fn mutate_index_metadata( tx: &mut Transaction<'_, Postgres>, index_uid: IndexUid, mutate_fn: M, -) -> MetastoreResult +) -> MetastoreResult where MetastoreError: From, + M: FnOnce(&mut IndexMetadata) -> Result, E>, { let index_id = &index_uid.index_id; let mut index_metadata = index_metadata(tx, index_id).await?; @@ -303,10 +305,11 @@ where index_id: index_id.to_string(), })); } - let mutation_occurred = mutate_fn(&mut index_metadata)?; - if !mutation_occurred { - return Ok(mutation_occurred); + + if let MutationOccurred::No(()) = mutate_fn(&mut index_metadata)? { + return Ok(index_metadata); } + let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| { MetastoreError::JsonSerializeError { struct_name: "IndexMetadata".to_string(), @@ -329,7 +332,7 @@ where index_id: index_id.to_string(), })); } - Ok(mutation_occurred) + Ok(index_metadata) } #[async_trait] @@ -399,6 +402,30 @@ impl MetastoreService for PostgresqlMetastore { Ok(response) } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> MetastoreResult { + let retention_policy_opt = request.deserialize_retention_policy()?; + let search_settings = request.deserialize_search_settings()?; + let index_uid: IndexUid = request.index_uid().clone(); + let updated_metadata = run_with_tx!(self.connection_pool, tx, { + mutate_index_metadata::(tx, index_uid, |index_metadata| { + if index_metadata.index_config.search_settings != search_settings + || index_metadata.index_config.retention_policy_opt != retention_policy_opt + { + index_metadata.index_config.search_settings = search_settings; + index_metadata.index_config.retention_policy_opt = retention_policy_opt; + Ok(MutationOccurred::Yes(())) + } else { + Ok(MutationOccurred::No(())) + } + }) + .await + })?; + IndexMetadataResponse::try_from_index_metadata(&updated_metadata) + } + #[instrument(skip_all, fields(index_id=%request.index_uid()))] async fn delete_index( &mut self, @@ -937,14 +964,10 @@ impl MetastoreService for PostgresqlMetastore { let source_config = request.deserialize_source_config()?; let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata::( - tx, - index_uid, - |index_metadata: &mut IndexMetadata| { - index_metadata.add_source(source_config)?; - Ok(true) - }, - ) + mutate_index_metadata::(tx, index_uid, |index_metadata| { + index_metadata.add_source(source_config)?; + Ok(MutationOccurred::Yes(())) + }) .await?; Ok(()) })?; @@ -959,7 +982,11 @@ impl MetastoreService for PostgresqlMetastore { let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { mutate_index_metadata(tx, index_uid, |index_metadata| { - index_metadata.toggle_source(&request.source_id, request.enable) + if index_metadata.toggle_source(&request.source_id, request.enable)? { + Ok::<_, MetastoreError>(MutationOccurred::Yes(())) + } else { + Ok::<_, MetastoreError>(MutationOccurred::No(())) + } }) .await?; Ok(()) @@ -976,7 +1003,8 @@ impl MetastoreService for PostgresqlMetastore { let source_id = request.source_id.clone(); run_with_tx!(self.connection_pool, tx, { mutate_index_metadata(tx, index_uid.clone(), |index_metadata| { - index_metadata.delete_source(&source_id) + index_metadata.delete_source(&source_id)?; + Ok::<_, MetastoreError>(MutationOccurred::Yes(())) }) .await?; sqlx::query( @@ -1004,7 +1032,11 @@ impl MetastoreService for PostgresqlMetastore { let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { mutate_index_metadata(tx, index_uid, |index_metadata| { - Ok::<_, MetastoreError>(index_metadata.checkpoint.reset_source(&request.source_id)) + if index_metadata.checkpoint.reset_source(&request.source_id) { + Ok::<_, MetastoreError>(MutationOccurred::Yes(())) + } else { + Ok::<_, MetastoreError>(MutationOccurred::No(())) + } }) .await?; Ok(()) diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 4a95effa698..a75ff115297 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -25,11 +25,17 @@ // - list_indexes // - delete_index +use std::collections::BTreeSet; + use quickwit_common::rand::append_random_suffix; -use quickwit_config::{IndexConfig, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID}; +use quickwit_config::{ + IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, +}; +use quickwit_doc_mapper::FieldMappingType; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, StageSplitsRequest, + UpdateIndexRequest, }; use quickwit_proto::types::IndexUid; @@ -37,7 +43,7 @@ use super::DefaultForTest; use crate::tests::cleanup_index; use crate::{ CreateIndexRequestExt, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, - MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, + MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, UpdateIndexRequestExt, }; pub async fn test_metastore_create_index< @@ -78,6 +84,97 @@ pub async fn test_metastore_create_index< cleanup_index(&mut metastore, index_uid).await; } +pub async fn test_metastore_update_index< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let index_id = append_random_suffix("test-update-index"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid = metastore + .create_index(create_index_request.clone()) + .await + .unwrap() + .index_uid() + .clone(); + + let index_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + + // use all fields that are currently not set as default + let current_defaults = BTreeSet::from_iter( + index_metadata + .index_config + .search_settings + .default_search_fields, + ); + let new_search_setting = SearchSettings { + default_search_fields: index_metadata + .index_config + .doc_mapping + .field_mappings + .iter() + .filter(|f| matches!(f.mapping_type, FieldMappingType::Text(..))) + .filter(|f| !current_defaults.contains(&f.name)) + .map(|f| f.name.clone()) + .collect(), + }; + + let new_retention_policy_opt = Some(RetentionPolicy { + retention_period: String::from("3 days"), + evaluation_schedule: String::from("daily"), + }); + assert_ne!( + index_metadata.index_config.retention_policy_opt, new_retention_policy_opt, + "original and updated value are the same, test became inefficient" + ); + + // run same update twice to check idempotence, then None as a corner case check + for loop_retention_policy_opt in [ + new_retention_policy_opt.clone(), + new_retention_policy_opt.clone(), + None, + ] { + let index_update = UpdateIndexRequest::try_from_updates( + index_uid.clone(), + &new_search_setting, + &loop_retention_policy_opt, + ) + .unwrap(); + let response_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!(response_metadata.index_uid, index_uid); + assert_eq!( + response_metadata.index_config.search_settings, + new_search_setting + ); + assert_eq!( + response_metadata.index_config.retention_policy_opt, + loop_retention_policy_opt + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!(response_metadata, updated_metadata); + } + + cleanup_index(&mut metastore, index_uid).await; +} + pub async fn test_metastore_create_index_with_sources< MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, >() { diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 70b5cb35352..3caf221a569 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -186,6 +186,12 @@ macro_rules! metastore_test_suite { $crate::tests::index::test_metastore_create_index::<$metastore_type>().await; } + #[tokio::test] + async fn test_metastore_update_index() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_index::<$metastore_type>().await; + } + #[tokio::test] async fn test_metastore_create_index_with_sources() { let _ = tracing_subscriber::fmt::try_init(); diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index e7065316f28..9c3d7e9a6a2 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -96,6 +96,9 @@ service MetastoreService { // An error will occur if an index that already exists in the storage is specified. rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse); + // Update an index. + rpc UpdateIndex(UpdateIndexRequest) returns (IndexMetadataResponse); + // Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. rpc IndexMetadata(IndexMetadataRequest) returns (IndexMetadataResponse); @@ -199,6 +202,12 @@ message CreateIndexResponse { string index_metadata_json = 2; } +message UpdateIndexRequest { + quickwit.common.IndexUid index_uid = 1; + string search_settings_json = 2; + optional string retention_policy_json = 3; +} + message ListIndexesMetadataRequest { reserved 1; // List of patterns an index should match or not match to get considered diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index e8fd10dea03..2d27a262d23 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -23,6 +23,17 @@ pub struct CreateIndexResponse { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct UpdateIndexRequest { + #[prost(message, optional, tag = "1")] + pub index_uid: ::core::option::Option, + #[prost(string, tag = "2")] + pub search_settings_json: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub retention_policy_json: ::core::option::Option<::prost::alloc::string::String>, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ListIndexesMetadataRequest { /// List of patterns an index should match or not match to get considered /// An index must match at least one positive pattern (a pattern not starting @@ -508,6 +519,11 @@ impl RpcName for CreateIndexRequest { "create_index" } } +impl RpcName for UpdateIndexRequest { + fn rpc_name() -> &'static str { + "update_index" + } +} impl RpcName for IndexMetadataRequest { fn rpc_name() -> &'static str { "index_metadata" @@ -652,6 +668,11 @@ pub trait MetastoreService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync &mut self, request: CreateIndexRequest, ) -> crate::metastore::MetastoreResult; + /// Update an index. + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult; /// Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. async fn index_metadata( &mut self, @@ -892,6 +913,12 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.create_index(request).await } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.update_index(request).await + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -1070,6 +1097,12 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.create_index(request).await } + async fn update_index( + &mut self, + request: super::UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.update_index(request).await + } async fn index_metadata( &mut self, request: super::IndexMetadataRequest, @@ -1257,6 +1290,22 @@ impl tower::Service for Box { Box::pin(fut) } } +impl tower::Service for Box { + type Response = IndexMetadataResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: UpdateIndexRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.update_index(request).await }; + Box::pin(fut) + } +} impl tower::Service for Box { type Response = IndexMetadataResponse; type Error = crate::metastore::MetastoreError; @@ -1682,6 +1731,11 @@ struct MetastoreServiceTowerServiceStack { CreateIndexResponse, crate::metastore::MetastoreError, >, + update_index_svc: quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, index_metadata_svc: quickwit_common::tower::BoxService< IndexMetadataRequest, IndexMetadataResponse, @@ -1818,6 +1872,7 @@ impl Clone for MetastoreServiceTowerServiceStack { Self { inner: self.inner.clone(), create_index_svc: self.create_index_svc.clone(), + update_index_svc: self.update_index_svc.clone(), index_metadata_svc: self.index_metadata_svc.clone(), list_indexes_metadata_svc: self.list_indexes_metadata_svc.clone(), delete_index_svc: self.delete_index_svc.clone(), @@ -1859,6 +1914,12 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.create_index_svc.ready().await?.call(request).await } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.update_index_svc.ready().await?.call(request).await + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -2032,6 +2093,16 @@ type CreateIndexLayer = quickwit_common::tower::BoxLayer< CreateIndexResponse, crate::metastore::MetastoreError, >; +type UpdateIndexLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, +>; type IndexMetadataLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< IndexMetadataRequest, @@ -2295,6 +2366,7 @@ type DeleteIndexTemplatesLayer = quickwit_common::tower::BoxLayer< #[derive(Debug, Default)] pub struct MetastoreServiceTowerLayerStack { create_index_layers: Vec, + update_index_layers: Vec, index_metadata_layers: Vec, list_indexes_metadata_layers: Vec, delete_index_layers: Vec, @@ -2350,6 +2422,31 @@ impl MetastoreServiceTowerLayerStack { crate::metastore::MetastoreError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + UpdateIndexRequest, + Response = IndexMetadataResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< IndexMetadataRequest, @@ -3019,6 +3116,8 @@ impl MetastoreServiceTowerLayerStack { { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.update_index_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.index_metadata_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.list_indexes_metadata_layers @@ -3092,6 +3191,25 @@ impl MetastoreServiceTowerLayerStack { self.create_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_update_index_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + UpdateIndexRequest, + Response = IndexMetadataResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.update_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn stack_index_metadata_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -3671,6 +3789,14 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(boxed_instance.clone()), |svc, layer| layer.layer(svc), ); + let update_index_svc = self + .update_index_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); let index_metadata_svc = self .index_metadata_layers .into_iter() @@ -3882,6 +4008,7 @@ impl MetastoreServiceTowerLayerStack { let tower_svc_stack = MetastoreServiceTowerServiceStack { inner: boxed_instance.clone(), create_index_svc, + update_index_svc, index_metadata_svc, list_indexes_metadata_svc, delete_index_svc, @@ -3990,6 +4117,12 @@ where Error = crate::metastore::MetastoreError, Future = BoxFuture, > + + tower::Service< + UpdateIndexRequest, + Response = IndexMetadataResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture, + > + tower::Service< IndexMetadataRequest, Response = IndexMetadataResponse, @@ -4174,6 +4307,12 @@ where ) -> crate::metastore::MetastoreResult { self.call(request).await } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.call(request).await + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -4390,6 +4529,19 @@ where CreateIndexRequest::rpc_name(), )) } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .update_index(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + UpdateIndexRequest::rpc_name(), + )) + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -4778,6 +4930,17 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn update_index( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .clone() + .update_index(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } async fn index_metadata( &self, request: tonic::Request, @@ -5232,6 +5395,34 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Update an index. + pub async fn update_index( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/UpdateIndex", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.metastore.MetastoreService", "UpdateIndex"), + ); + self.inner.unary(req, path, codec).await + } /// Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. pub async fn index_metadata( &mut self, @@ -6008,6 +6199,14 @@ pub mod metastore_service_grpc_server { tonic::Response, tonic::Status, >; + /// Update an index. + async fn update_index( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. async fn index_metadata( &self, @@ -6360,6 +6559,52 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } + "/quickwit.metastore.MetastoreService/UpdateIndex" => { + #[allow(non_camel_case_types)] + struct UpdateIndexSvc(pub Arc); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for UpdateIndexSvc { + type Response = super::IndexMetadataResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).update_index(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = UpdateIndexSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.metastore.MetastoreService/IndexMetadata" => { #[allow(non_camel_case_types)] struct IndexMetadataSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/getters.rs b/quickwit/quickwit-proto/src/getters.rs index 0f70f8f799a..7e8416e56d5 100644 --- a/quickwit/quickwit-proto/src/getters.rs +++ b/quickwit/quickwit-proto/src/getters.rs @@ -68,6 +68,7 @@ generate_getters! { ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, + UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest } diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index a4a8a9c9278..e90267482a8 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -26,7 +26,9 @@ use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_search::SearchResponseRest; -use quickwit_serve::{ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString}; +use quickwit_serve::{ + IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString, +}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; @@ -352,6 +354,21 @@ impl<'a> IndexClient<'a> { Ok(index_metadata) } + pub async fn update( + &self, + index_id: &str, + index_updates: IndexUpdates, + ) -> Result { + let body = Bytes::from(serde_json::to_string(&index_updates)?); + let path = format!("indexes/{index_id}"); + let response = self + .transport + .send::<()>(Method::PUT, &path, None, None, Some(body), self.timeout) + .await?; + let index_metadata = response.deserialize().await?; + Ok(index_metadata) + } + pub async fn list(&self) -> Result, Error> { let response = self .transport diff --git a/quickwit/quickwit-serve/src/index_api/mod.rs b/quickwit/quickwit-serve/src/index_api/mod.rs index 9b0990ce1f8..ab831526c81 100644 --- a/quickwit/quickwit-serve/src/index_api/mod.rs +++ b/quickwit/quickwit-serve/src/index_api/mod.rs @@ -20,5 +20,5 @@ mod rest_handler; pub use self::rest_handler::{ - index_management_handlers, IndexApi, ListSplitsQueryParams, ListSplitsResponse, + index_management_handlers, IndexApi, IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, }; diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 94e2cd6d6d5..a64ff918540 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -23,18 +23,21 @@ use bytes::Bytes; use quickwit_common::uri::Uri; use quickwit_config::{ load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, NodeConfig, - SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, + RetentionPolicy, SearchSettings, SourceConfig, SourceParams, CLI_SOURCE_ID, + INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, Split, SplitInfo, SplitState, + UpdateIndexRequestExt, }; use quickwit_proto::metastore::{ DeleteSourceRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest, ToggleSourceRequest, + UpdateIndexRequest, }; use quickwit_proto::types::IndexUid; use quickwit_query::query_ast::{query_ast_from_user_text, QueryAst}; @@ -52,6 +55,7 @@ use crate::with_arg; #[openapi( paths( create_index, + update_index, clear_index, delete_index, list_indexes_metadata, @@ -63,7 +67,7 @@ use crate::with_arg; toggle_source, delete_source, ), - components(schemas(ToggleSource, SplitsForDeletion, IndexStats)) + components(schemas(ToggleSource, SplitsForDeletion, IndexStats, IndexUpdates)) )] pub struct IndexApi; @@ -86,6 +90,7 @@ pub fn index_management_handlers( get_index_metadata_handler(index_service.metastore()) .or(list_indexes_metadata_handler(index_service.metastore())) .or(create_index_handler(index_service.clone(), node_config)) + .or(update_index_handler(index_service.metastore())) .or(clear_index_handler(index_service.clone())) .or(delete_index_handler(index_service.clone())) // Splits handlers @@ -521,6 +526,68 @@ async fn create_index( .await } +/// The body of the index update request. All fields will be replaced in the +/// existing configuration. +#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] // Remove when adding new fields to allow to ensure forward compatibility +pub struct IndexUpdates { + pub search_settings: SearchSettings, + #[serde(rename = "retention_policy")] + pub retention_policy_opt: Option, +} + +fn update_index_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + warp::path!("indexes" / String) + .and(warp::put()) + .and(json_body()) + .and(with_arg(metastore)) + .then(update_index) + .map(log_failure("failed to update index")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + put, + tag = "Indexes", + path = "/indexes/{index_id}", + request_body = IndexUpdates, + responses( + (status = 200, description = "Successfully updated the index configuration.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +/// Updates an existing index. +/// +/// This endpoint has PUT semantics, which means that all the updatable fields of the index +/// configuration are replaced by the values specified in the request. In particular, omitting an +/// optional field like `retention_policy` will delete the associated configuration. +async fn update_index( + index_id: String, + request: IndexUpdates, + mut metastore: MetastoreServiceClient, +) -> Result { + info!(index_id = %index_id, "update-index"); + let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); + let index_uid: IndexUid = metastore + .index_metadata(index_metadata_request) + .await? + .deserialize_index_metadata()? + .index_uid; + + let update_request = UpdateIndexRequest::try_from_updates( + index_uid, + &request.search_settings, + &request.retention_policy_opt, + )?; + let update_resp = metastore.update_index(update_request).await?; + Ok(update_resp.deserialize_index_metadata()?) +} + fn clear_index_handler( index_service: IndexService, ) -> impl Filter + Clone { @@ -1711,6 +1778,68 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_update_index() { + let mut metastore = metastore_for_test(); + let index_service = IndexService::new(metastore.clone(), StorageResolver::unconfigured()); + let mut node_config = NodeConfig::for_test(); + node_config.default_index_root_uri = Uri::for_test("file:///default-index-root-uri"); + let index_management_handler = + super::index_management_handlers(index_service, Arc::new(node_config)); + { + let resp = warp::test::request() + .path("/indexes") + .method("POST") + .json(&true) + .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]},"search_settings":{"default_search_fields":["body"]}}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({ + "index_config": { + "search_settings": { + "default_search_fields": ["body"] + } + } + }); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } + { + let resp = warp::test::request() + .path("/indexes/hdfs-logs") + .method("PUT") + .json(&true) + .body(r#"{"search_settings":{"default_search_fields":["severity_text","body"]}}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({ + "index_config": { + "search_settings": { + "default_search_fields": ["severity_text", "body"] + } + } + }); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } + // check that the metastore was updated + let index_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string())) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + index_metadata + .index_config + .search_settings + .default_search_fields, + ["severity_text", "body"] + ); + } + #[tokio::test] async fn test_create_source_with_bad_config() { let metastore = metastore_for_test(); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index c14d6885cfb..5cd5b487a93 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -117,7 +117,7 @@ use tracing::{debug, error, info, warn}; use warp::{Filter, Rejection}; pub use crate::build_info::{BuildInfo, RuntimeInfo}; -pub use crate::index_api::{ListSplitsQueryParams, ListSplitsResponse}; +pub use crate::index_api::{IndexUpdates, ListSplitsQueryParams, ListSplitsResponse}; pub use crate::metrics::SERVE_METRICS; use crate::rate_modulator::RateModulator; #[cfg(test)]