From d8ce74e6b0890c1ccbc69df7f6e5b110f1c808f7 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:28 +0000 Subject: [PATCH 01/13] Add update endpoint with docs and unit tests Missing integration test --- docs/configuration/index-config.md | 3 +- docs/reference/rest-api.md | 56 +++- quickwit/quickwit-metastore/src/lib.rs | 1 + .../src/metastore/control_plane_metastore.rs | 11 +- .../file_backed/file_backed_index/mod.rs | 5 + .../src/metastore/file_backed/mod.rs | 30 ++- .../quickwit-metastore/src/metastore/mod.rs | 56 +++- .../src/metastore/postgres/metastore.rs | 39 ++- .../quickwit-metastore/src/tests/index.rs | 89 ++++++- quickwit/quickwit-metastore/src/tests/mod.rs | 6 + .../protos/quickwit/metastore.proto | 9 + .../codegen/quickwit/quickwit.metastore.rs | 245 ++++++++++++++++++ quickwit/quickwit-proto/src/getters.rs | 1 + .../src/index_api/rest_handler.rs | 101 +++++++- 14 files changed, 635 insertions(+), 17 deletions(-) diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index 50da9d5851c..5a3483f2827 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -12,8 +12,9 @@ In addition to the `index_id`, the index configuration lets you define five item - The **doc mapping**: it defines how a document and the fields it contains are stored and indexed for a given index. - The **indexing settings**: it defines the timestamp field used for sharding, and some more advanced parameters like the merge policy. - The **search settings**: it defines the default search fields `default_search_fields`, a list of fields that Quickwit will search into if the user query does not explicitly target a field. +- The **retention policy**: it defines how long Quickwit should keep the indexed data. If not specified, the data is stored forever. -Configuration is set at index creation and cannot be modified with the current version of Quickwit. +In general, configuration is set at index creation and cannot be modified. Starting Quickwit 0.9, the search setttings and retention policy can be changed using the update endpoint. ## Config file format diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index b58cdafe66d..03125de3c47 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -223,13 +223,13 @@ The response is a JSON object, and the content type is `application/json; charse POST api/v1/indexes ``` -Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json`) and YAML `content-type: application/yaml`. +Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json` and YAML `content-type: application/yaml`. #### POST payload -| Variable | Type | Description | Default value | -|-----------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| -| `version` | `String` | Config format version, use the same as your Quickwit version. (mandatory) | | +| Variable | Type | Description | Default value | +|---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| `version` | `String` | Config format version, use the same as your Quickwit version. (mandatory) | | | `index_id` | `String` | Index ID, see its [validation rules](../configuration/index-config.md#index-id) on identifiers. (mandatory) | | | `index_uri` | `String` | Defines where the index files are stored. This parameter expects a [storage URI](../configuration/storage-config.md#storage-uris). | `{default_index_root_uri}/{index_id}` | | `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping) (mandatory) | | @@ -301,8 +301,52 @@ curl -XPOST http://0.0.0.0:8080/api/v1/indexes --data @index_config.json -H "Con The response is the index metadata of the created index, and the content type is `application/json; charset=UTF-8.` -| Field | Description | Type | -|----------------------|-------------------------------------------|:---------------------:| +| Field | Description | Type | +|----------------------|-----------------------------------------|:---------------------:| +| `index_config` | The posted index config. | `IndexConfig` | +| `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | +| `create_timestamp` | Index creation timestamp | `number` | +| `sources` | List of the index sources configurations. | `Array` | + + +### Update an index + +``` +PUT api/v1/indexes/ +``` + +Update an index with the updatables parts of the `IndexConfig` payload. Note that this follows the PUT semantics and not PATCH, so all the fields must be specified and Unlike the create endpoint, this API accepts JSON only. + +#### PUT payload + +| Variable | Type | Description | Default value | +|---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | +| `retention` | `Retention` | Retention policy object as specified in the [index config docs](../configuration/index-config.md#retention-policy). | | + + +**Payload Example** + +curl -XPUT http://0.0.0.0:8080/api/v1/indexes --data @index_update.json -H "Content-Type: application/json" + +```json title="index_update.json +{ + "search_settings": { + "default_search_fields": ["body"] + }, + "retention": { + "period": "3 days", + "schedule": "@daily" + } +} +``` + +#### Response + +The response is the index metadata of the updated index, and the content type is `application/json; charset=UTF-8.` + +| Field | Description | Type | +|----------------------|-----------------------------------------|:---------------------:| | `index_config` | The posted index config. | `IndexConfig` | | `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | | `create_timestamp` | Index creation timestamp | `number` | diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index 92bb05e1a2a..db01e9c48b8 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -52,6 +52,7 @@ pub use metastore::{ IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt, + UpdateIndexRequestExt, }; pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; pub use metastore_resolver::MetastoreResolver; diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 4662b8cb6c9..5e4928794c2 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -35,7 +35,8 @@ use quickwit_proto::metastore::{ ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, - ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, }; /// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can @@ -116,6 +117,14 @@ impl MetastoreService for ControlPlaneMetastore { // Other metastore API calls. + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> MetastoreResult { + let response = self.metastore.update_index(request).await?; + Ok(response) + } + async fn index_metadata( &mut self, request: IndexMetadataRequest, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 17b39f046da..c2b35f08e76 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -213,6 +213,11 @@ impl FileBackedIndex { &self.metadata } + /// Mutable ref to index metadata. + pub fn metadata_mut(&mut self) -> &mut IndexMetadata { + &mut self.metadata + } + /// Stages a single split. /// /// If a split already exists and is in the [SplitState::Staged] state, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 3843a580d81..035d916a33c 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -55,7 +55,7 @@ use quickwit_proto::metastore::{ ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, - StageSplitsRequest, ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid}; @@ -73,7 +73,8 @@ use self::store_operations::{delete_index, index_exists, load_index, put_index}; use super::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, - PublishSplitsRequestExt, StageSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, + PublishSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, + STREAM_SPLITS_CHUNK_SIZE, }; use crate::checkpoint::IndexCheckpointDelta; use crate::{IndexMetadata, ListSplitsQuery, MetastoreServiceExt, Split, SplitState}; @@ -456,6 +457,31 @@ impl MetastoreService for FileBackedMetastore { Ok(response) } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> MetastoreResult { + let search_settings = request.deserialize_search_settings()?; + let retention_policy_opt = request.deserialize_retention_policy()?; + let index_uid = request.index_uid(); + + let metadata = self + .mutate(index_uid, |index| { + let metadata = index.metadata_mut(); + if metadata.index_config.search_settings != search_settings + || metadata.index_config.retention_policy_opt != retention_policy_opt + { + metadata.index_config.search_settings = search_settings; + metadata.index_config.retention_policy_opt = retention_policy_opt; + Ok(MutationOccurred::Yes(metadata.clone())) + } else { + Ok(MutationOccurred::No(metadata.clone())) + } + }) + .await?; + IndexMetadataResponse::try_from_index_metadata(&metadata) + } + async fn delete_index( &mut self, request: DeleteIndexRequest, diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index d00a3e14c1a..671748f0a0e 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,13 +31,13 @@ use bytes::Bytes; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; -use quickwit_config::{IndexConfig, SourceConfig}; +use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, IndexMetadataRequest, IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsRequest, ListSplitsResponse, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, - MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, + MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{IndexUid, SplitId}; use time::OffsetDateTime; @@ -179,6 +179,58 @@ impl CreateIndexResponseExt for CreateIndexResponse { } } +/// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. +pub trait UpdateIndexRequestExt { + /// Updates a new [`UpdateIndexRequest`] from an [`IndexConfig`]. + fn try_from_updates( + index_uid: impl Into, + search_settings: &SearchSettings, + retention_policy_opt: &Option, + ) -> MetastoreResult; + + /// Deserializes the `index_config_json` field of a [`UpdateIndexRequest`] into an + /// [`IndexConfig`]. + fn deserialize_search_settings(&self) -> MetastoreResult; + + /// Deserializes the `source_configs_json` field of a [`UpdateIndexRequest`] into an + /// `Vec` of [`SourceConfig`]. + fn deserialize_retention_policy(&self) -> MetastoreResult>; +} + +impl UpdateIndexRequestExt for UpdateIndexRequest { + fn try_from_updates( + index_uid: impl Into, + search_settings: &SearchSettings, + retention_policy_opt: &Option, + ) -> MetastoreResult { + let search_settings_json = serde_utils::to_json_str(&search_settings)?; + let retention_policy_json = if let Some(retention_policy) = &retention_policy_opt { + Some(serde_utils::to_json_str(retention_policy)?) + } else { + None + }; + + let update_request = UpdateIndexRequest { + index_uid: Some(index_uid.into()), + search_settings_json, + retention_policy_json, + }; + Ok(update_request) + } + + fn deserialize_search_settings(&self) -> MetastoreResult { + serde_utils::from_json_str(&self.search_settings_json) + } + + fn deserialize_retention_policy(&self) -> MetastoreResult> { + if let Some(retention_policy_json) = &self.retention_policy_json { + serde_utils::from_json_str(retention_policy_json).map(Some) + } else { + Ok(None) + } + } +} + /// Helper trait to build a [`IndexMetadataResponse`] and deserialize its payload. pub trait IndexMetadataResponseExt { /// Creates a new [`IndexMetadataResponse`] from an [`IndexMetadata`]. diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 519e11b11db..c2132c2fdad 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -42,7 +42,7 @@ use quickwit_proto::metastore::{ ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, + ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId}; @@ -65,7 +65,7 @@ use crate::metastore::{PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE}; use crate::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, - MetastoreServiceExt, Split, SplitState, StageSplitsRequestExt, + MetastoreServiceExt, Split, SplitState, StageSplitsRequestExt, UpdateIndexRequestExt, }; /// PostgreSQL metastore implementation. @@ -399,6 +399,41 @@ impl MetastoreService for PostgresqlMetastore { Ok(response) } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> MetastoreResult { + let retention_policy_opt = request.deserialize_retention_policy()?; + let search_settings = request.deserialize_search_settings()?; + let index_uid: IndexUid = request.index_uid().clone(); + let mut mutated_metadata_opt = None; + let mutated_metadata_ref = &mut mutated_metadata_opt; + run_with_tx!(self.connection_pool, tx, { + mutate_index_metadata::( + tx, + index_uid, + |index_metadata: &mut IndexMetadata| { + let mutated = if index_metadata.index_config.search_settings != search_settings + || index_metadata.index_config.retention_policy_opt != retention_policy_opt + { + index_metadata.index_config.search_settings = search_settings; + index_metadata.index_config.retention_policy_opt = retention_policy_opt; + true + } else { + false + }; + *mutated_metadata_ref = Some(index_metadata.clone()); + Ok(mutated) + }, + ) + .await?; + Ok(()) + })?; + let mutated_metadata = + mutated_metadata_opt.expect("Mutated IndexMetadata should be set by transaction"); + IndexMetadataResponse::try_from_index_metadata(&mutated_metadata) + } + #[instrument(skip_all, fields(index_id=%request.index_uid()))] async fn delete_index( &mut self, diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 4a95effa698..f5e00718906 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -25,11 +25,16 @@ // - list_indexes // - delete_index +use std::vec; + use quickwit_common::rand::append_random_suffix; -use quickwit_config::{IndexConfig, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID}; +use quickwit_config::{ + IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, +}; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, StageSplitsRequest, + UpdateIndexRequest, }; use quickwit_proto::types::IndexUid; @@ -37,7 +42,7 @@ use super::DefaultForTest; use crate::tests::cleanup_index; use crate::{ CreateIndexRequestExt, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, - MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, + MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, UpdateIndexRequestExt, }; pub async fn test_metastore_create_index< @@ -78,6 +83,86 @@ pub async fn test_metastore_create_index< cleanup_index(&mut metastore, index_uid).await; } +pub async fn test_metastore_update_index< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let index_id = append_random_suffix("test-update-index"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid = metastore + .create_index(create_index_request.clone()) + .await + .unwrap() + .index_uid() + .clone(); + + let index_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + + let new_search_setting = SearchSettings { + default_search_fields: vec!["body".to_string(), "owner".to_string()], + }; + assert_ne!( + index_metadata.index_config.search_settings, new_search_setting, + "original and updated value are the same, test became inefficient" + ); + + let new_retention_policy_opt = Some(RetentionPolicy { + retention_period: String::from("3 days"), + evaluation_schedule: String::from("daily"), + }); + assert_ne!( + index_metadata.index_config.retention_policy_opt, new_retention_policy_opt, + "original and updated value are the same, test became inefficient" + ); + + // run same update twice to check indempotence, then None as a corner case check + for loop_retention_policy_opt in [ + new_retention_policy_opt.clone(), + new_retention_policy_opt.clone(), + None, + ] { + let index_update = UpdateIndexRequest::try_from_updates( + index_uid.clone(), + &new_search_setting, + &loop_retention_policy_opt, + ) + .unwrap(); + let response_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!(response_metadata.index_uid, index_uid); + assert_eq!( + response_metadata.index_config.search_settings, + new_search_setting + ); + assert_eq!( + response_metadata.index_config.retention_policy_opt, + loop_retention_policy_opt + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!(response_metadata, updated_metadata); + } + + cleanup_index(&mut metastore, index_uid).await; +} + pub async fn test_metastore_create_index_with_sources< MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, >() { diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 70b5cb35352..3caf221a569 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -186,6 +186,12 @@ macro_rules! metastore_test_suite { $crate::tests::index::test_metastore_create_index::<$metastore_type>().await; } + #[tokio::test] + async fn test_metastore_update_index() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_index::<$metastore_type>().await; + } + #[tokio::test] async fn test_metastore_create_index_with_sources() { let _ = tracing_subscriber::fmt::try_init(); diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index e7065316f28..9c3d7e9a6a2 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -96,6 +96,9 @@ service MetastoreService { // An error will occur if an index that already exists in the storage is specified. rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse); + // Update an index. + rpc UpdateIndex(UpdateIndexRequest) returns (IndexMetadataResponse); + // Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. rpc IndexMetadata(IndexMetadataRequest) returns (IndexMetadataResponse); @@ -199,6 +202,12 @@ message CreateIndexResponse { string index_metadata_json = 2; } +message UpdateIndexRequest { + quickwit.common.IndexUid index_uid = 1; + string search_settings_json = 2; + optional string retention_policy_json = 3; +} + message ListIndexesMetadataRequest { reserved 1; // List of patterns an index should match or not match to get considered diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index e8fd10dea03..2d27a262d23 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -23,6 +23,17 @@ pub struct CreateIndexResponse { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct UpdateIndexRequest { + #[prost(message, optional, tag = "1")] + pub index_uid: ::core::option::Option, + #[prost(string, tag = "2")] + pub search_settings_json: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub retention_policy_json: ::core::option::Option<::prost::alloc::string::String>, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ListIndexesMetadataRequest { /// List of patterns an index should match or not match to get considered /// An index must match at least one positive pattern (a pattern not starting @@ -508,6 +519,11 @@ impl RpcName for CreateIndexRequest { "create_index" } } +impl RpcName for UpdateIndexRequest { + fn rpc_name() -> &'static str { + "update_index" + } +} impl RpcName for IndexMetadataRequest { fn rpc_name() -> &'static str { "index_metadata" @@ -652,6 +668,11 @@ pub trait MetastoreService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync &mut self, request: CreateIndexRequest, ) -> crate::metastore::MetastoreResult; + /// Update an index. + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult; /// Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. async fn index_metadata( &mut self, @@ -892,6 +913,12 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.create_index(request).await } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.update_index(request).await + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -1070,6 +1097,12 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.create_index(request).await } + async fn update_index( + &mut self, + request: super::UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.update_index(request).await + } async fn index_metadata( &mut self, request: super::IndexMetadataRequest, @@ -1257,6 +1290,22 @@ impl tower::Service for Box { Box::pin(fut) } } +impl tower::Service for Box { + type Response = IndexMetadataResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: UpdateIndexRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.update_index(request).await }; + Box::pin(fut) + } +} impl tower::Service for Box { type Response = IndexMetadataResponse; type Error = crate::metastore::MetastoreError; @@ -1682,6 +1731,11 @@ struct MetastoreServiceTowerServiceStack { CreateIndexResponse, crate::metastore::MetastoreError, >, + update_index_svc: quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, index_metadata_svc: quickwit_common::tower::BoxService< IndexMetadataRequest, IndexMetadataResponse, @@ -1818,6 +1872,7 @@ impl Clone for MetastoreServiceTowerServiceStack { Self { inner: self.inner.clone(), create_index_svc: self.create_index_svc.clone(), + update_index_svc: self.update_index_svc.clone(), index_metadata_svc: self.index_metadata_svc.clone(), list_indexes_metadata_svc: self.list_indexes_metadata_svc.clone(), delete_index_svc: self.delete_index_svc.clone(), @@ -1859,6 +1914,12 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.create_index_svc.ready().await?.call(request).await } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.update_index_svc.ready().await?.call(request).await + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -2032,6 +2093,16 @@ type CreateIndexLayer = quickwit_common::tower::BoxLayer< CreateIndexResponse, crate::metastore::MetastoreError, >; +type UpdateIndexLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, +>; type IndexMetadataLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< IndexMetadataRequest, @@ -2295,6 +2366,7 @@ type DeleteIndexTemplatesLayer = quickwit_common::tower::BoxLayer< #[derive(Debug, Default)] pub struct MetastoreServiceTowerLayerStack { create_index_layers: Vec, + update_index_layers: Vec, index_metadata_layers: Vec, list_indexes_metadata_layers: Vec, delete_index_layers: Vec, @@ -2350,6 +2422,31 @@ impl MetastoreServiceTowerLayerStack { crate::metastore::MetastoreError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + UpdateIndexRequest, + Response = IndexMetadataResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< IndexMetadataRequest, @@ -3019,6 +3116,8 @@ impl MetastoreServiceTowerLayerStack { { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.update_index_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.index_metadata_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.list_indexes_metadata_layers @@ -3092,6 +3191,25 @@ impl MetastoreServiceTowerLayerStack { self.create_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_update_index_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + UpdateIndexRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + UpdateIndexRequest, + Response = IndexMetadataResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.update_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn stack_index_metadata_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -3671,6 +3789,14 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(boxed_instance.clone()), |svc, layer| layer.layer(svc), ); + let update_index_svc = self + .update_index_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); let index_metadata_svc = self .index_metadata_layers .into_iter() @@ -3882,6 +4008,7 @@ impl MetastoreServiceTowerLayerStack { let tower_svc_stack = MetastoreServiceTowerServiceStack { inner: boxed_instance.clone(), create_index_svc, + update_index_svc, index_metadata_svc, list_indexes_metadata_svc, delete_index_svc, @@ -3990,6 +4117,12 @@ where Error = crate::metastore::MetastoreError, Future = BoxFuture, > + + tower::Service< + UpdateIndexRequest, + Response = IndexMetadataResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture, + > + tower::Service< IndexMetadataRequest, Response = IndexMetadataResponse, @@ -4174,6 +4307,12 @@ where ) -> crate::metastore::MetastoreResult { self.call(request).await } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.call(request).await + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -4390,6 +4529,19 @@ where CreateIndexRequest::rpc_name(), )) } + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .update_index(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + UpdateIndexRequest::rpc_name(), + )) + } async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -4778,6 +4930,17 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn update_index( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .clone() + .update_index(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } async fn index_metadata( &self, request: tonic::Request, @@ -5232,6 +5395,34 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Update an index. + pub async fn update_index( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/UpdateIndex", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.metastore.MetastoreService", "UpdateIndex"), + ); + self.inner.unary(req, path, codec).await + } /// Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. pub async fn index_metadata( &mut self, @@ -6008,6 +6199,14 @@ pub mod metastore_service_grpc_server { tonic::Response, tonic::Status, >; + /// Update an index. + async fn update_index( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. async fn index_metadata( &self, @@ -6360,6 +6559,52 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } + "/quickwit.metastore.MetastoreService/UpdateIndex" => { + #[allow(non_camel_case_types)] + struct UpdateIndexSvc(pub Arc); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for UpdateIndexSvc { + type Response = super::IndexMetadataResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).update_index(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = UpdateIndexSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.metastore.MetastoreService/IndexMetadata" => { #[allow(non_camel_case_types)] struct IndexMetadataSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/getters.rs b/quickwit/quickwit-proto/src/getters.rs index 0f70f8f799a..7e8416e56d5 100644 --- a/quickwit/quickwit-proto/src/getters.rs +++ b/quickwit/quickwit-proto/src/getters.rs @@ -68,6 +68,7 @@ generate_getters! { ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, + UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest } diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 94e2cd6d6d5..28dfc280a9b 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -23,18 +23,21 @@ use bytes::Bytes; use quickwit_common::uri::Uri; use quickwit_config::{ load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, NodeConfig, - SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, + RetentionPolicy, SearchSettings, SourceConfig, SourceParams, CLI_SOURCE_ID, + INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, Split, SplitInfo, SplitState, + UpdateIndexRequestExt, }; use quickwit_proto::metastore::{ DeleteSourceRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest, ToggleSourceRequest, + UpdateIndexRequest, }; use quickwit_proto::types::IndexUid; use quickwit_query::query_ast::{query_ast_from_user_text, QueryAst}; @@ -86,6 +89,7 @@ pub fn index_management_handlers( get_index_metadata_handler(index_service.metastore()) .or(list_indexes_metadata_handler(index_service.metastore())) .or(create_index_handler(index_service.clone(), node_config)) + .or(update_index_handler(index_service.metastore())) .or(clear_index_handler(index_service.clone())) .or(delete_index_handler(index_service.clone())) // Splits handlers @@ -521,6 +525,64 @@ async fn create_index( .await } +/// The body of the index update request +/// +/// Remove #[serde(deny_unknown_fields)] when adding new fields to allow to ensure forward +/// compatibility. +#[derive(Deserialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct IndexUpdates { + pub search_settings: SearchSettings, + pub retention_policy_opt: Option, +} + +fn update_index_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + warp::path!("indexes" / String) + .and(warp::put()) + .and(json_body()) + .and(with_arg(metastore)) + .then(update_index) + .map(log_failure("failed to update index")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + put, + tag = "Indexes", + path = "/indexes/{index_id}", + request_body = UpdateIndexRequest, + responses( + (status = 200, description = "Successfully marked splits for deletion.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +async fn update_index( + index_id: String, + request: IndexUpdates, + mut metastore: MetastoreServiceClient, +) -> Result { + info!(index_id = %index_id, "update-index"); + let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); + let index_uid: IndexUid = metastore + .index_metadata(index_metadata_request) + .await? + .deserialize_index_metadata()? + .index_uid; + + let update_request = UpdateIndexRequest::try_from_updates( + index_uid, + &request.search_settings, + &request.retention_policy_opt, + )?; + let update_resp = metastore.update_index(update_request).await?; + Ok(update_resp.deserialize_index_metadata()?) +} + fn clear_index_handler( index_service: IndexService, ) -> impl Filter + Clone { @@ -1711,6 +1773,43 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_update_index() { + let metastore = metastore_for_test(); + let index_service = IndexService::new(metastore.clone(), StorageResolver::unconfigured()); + let mut node_config = NodeConfig::for_test(); + node_config.default_index_root_uri = Uri::for_test("file:///default-index-root-uri"); + let index_management_handler = + super::index_management_handlers(index_service, Arc::new(node_config)); + { + let resp = warp::test::request() + .path("/indexes") + .method("POST") + .json(&true) + .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]},"search_settings":{"default_search_fields":["body"]}}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({"default_search_fields":["body"]}); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } + { + let resp = warp::test::request() + .path("/indexes/hdfs-logs") + .method("PUT") + .json(&true) + .body(r#"{"search_settings":{"default_search_fields":["severity_text","body"]}}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = + serde_json::json!({"default_search_fields":["severity_text","body"]}); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } + } + #[tokio::test] async fn test_create_source_with_bad_config() { let metastore = metastore_for_test(); From 91149f155a2298e1e6a142cd975ee2da595e7790 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:29 +0000 Subject: [PATCH 02/13] Fix unit test --- .../src/index_api/rest_handler.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 28dfc280a9b..db837ddc312 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -529,7 +529,7 @@ async fn create_index( /// /// Remove #[serde(deny_unknown_fields)] when adding new fields to allow to ensure forward /// compatibility. -#[derive(Deserialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] +#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct IndexUpdates { pub search_settings: SearchSettings, @@ -1791,7 +1791,13 @@ mod tests { .await; assert_eq!(resp.status(), 200); let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); - let expected_response_json = serde_json::json!({"default_search_fields":["body"]}); + let expected_response_json = serde_json::json!({ + "index_config": { + "search_settings": { + "default_search_fields": ["body"] + } + } + }); assert_json_include!(actual: resp_json, expected: expected_response_json); } { @@ -1804,8 +1810,13 @@ mod tests { .await; assert_eq!(resp.status(), 200); let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); - let expected_response_json = - serde_json::json!({"default_search_fields":["severity_text","body"]}); + let expected_response_json = serde_json::json!({ + "index_config": { + "search_settings": { + "default_search_fields": ["severity_text", "body"] + } + } + }); assert_json_include!(actual: resp_json, expected: expected_response_json); } } From ed63c5ad85efef1a68210800b3cd6df88e1c838d Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:29 +0000 Subject: [PATCH 03/13] Cli and integration tests --- .../src/{index.rs => index/mod.rs} | 23 +- quickwit/quickwit-cli/src/index/update.rs | 261 ++++++++++++++++++ .../quickwit-config/src/index_config/mod.rs | 2 +- .../src/tests/index_update_tests.rs | 146 ++++++++++ .../src/tests/mod.rs | 1 + .../quickwit-rest-client/src/rest_client.rs | 19 +- quickwit/quickwit-serve/src/index_api/mod.rs | 2 +- quickwit/quickwit-serve/src/lib.rs | 2 +- 8 files changed, 445 insertions(+), 11 deletions(-) rename quickwit/quickwit-cli/src/{index.rs => index/mod.rs} (99%) create mode 100644 quickwit/quickwit-cli/src/index/update.rs create mode 100644 quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index/mod.rs similarity index 99% rename from quickwit/quickwit-cli/src/index.rs rename to quickwit/quickwit-cli/src/index/mod.rs index 6a5fda6bb9d..9743e98f071 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index/mod.rs @@ -54,13 +54,16 @@ use tabled::{Table, Tabled}; use thousands::Separable; use tracing::{debug, Level}; +use self::update::{build_index_update_command, IndexUpdateCliCommand}; use crate::checklist::GREEN_COLOR; use crate::stats::{mean, percentile, std_deviation}; use crate::{client_args, make_table, prompt_confirmation, ClientArgs, THROUGHPUT_WINDOW_SIZE}; +mod update; + pub fn build_index_command() -> Command { Command::new("index") - .about("Manages indexes: creates, deletes, ingests, searches, describes...") + .about("Manages indexes: creates, updates, deletes, ingests, searches, describes...") .args(client_args()) .subcommand( Command::new("create") @@ -74,9 +77,12 @@ pub fn build_index_command() -> Command { .required(false), ]) ) + .subcommand( + build_index_update_command() + ) .subcommand( Command::new("clear") - .display_order(2) + .display_order(3) .alias("clr") .about("Clears an index: deletes all splits and resets checkpoint.") .long_about("Deletes all its splits and resets its checkpoint. This operation is destructive and cannot be undone, proceed with caution.") @@ -88,7 +94,7 @@ pub fn build_index_command() -> Command { ) .subcommand( Command::new("delete") - .display_order(3) + .display_order(4) .alias("del") .about("Deletes an index.") .long_about("Deletes an index. This operation is destructive and cannot be undone, proceed with caution.") @@ -102,7 +108,7 @@ pub fn build_index_command() -> Command { ) .subcommand( Command::new("describe") - .display_order(4) + .display_order(5) .about("Displays descriptive statistics of an index.") .long_about("Displays descriptive statistics of an index. Displayed statistics are: number of published splits, number of documents, splits min/max timestamps, size of splits.") .args(&[ @@ -113,12 +119,12 @@ pub fn build_index_command() -> Command { .subcommand( Command::new("list") .alias("ls") - .display_order(5) + .display_order(6) .about("List indexes.") ) .subcommand( Command::new("ingest") - .display_order(6) + .display_order(7) .about("Ingest NDJSON documents with the ingest API.") .long_about("Reads NDJSON documents from a file or streamed from stdin and sends them into ingest API.") .args(&[ @@ -155,7 +161,7 @@ pub fn build_index_command() -> Command { ) .subcommand( Command::new("search") - .display_order(7) + .display_order(8) .about("Searches an index.") .args(&[ arg!(--index "ID of the target index") @@ -256,6 +262,7 @@ pub enum IndexCliCommand { Ingest(IngestDocsArgs), List(ListIndexesArgs), Search(SearchIndexArgs), + Update(IndexUpdateCliCommand), } impl IndexCliCommand { @@ -278,6 +285,7 @@ impl IndexCliCommand { "ingest" => Self::parse_ingest_args(submatches), "list" => Self::parse_list_args(submatches), "search" => Self::parse_search_args(submatches), + "update" => Ok(Self::Update(IndexUpdateCliCommand::parse_args(matches)?)), _ => bail!("unknown index subcommand `{subcommand}`"), } } @@ -438,6 +446,7 @@ impl IndexCliCommand { Self::Ingest(args) => ingest_docs_cli(args).await, Self::List(args) => list_index_cli(args).await, Self::Search(args) => search_index_cli(args).await, + Self::Update(args) => args.execute().await, } } } diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs new file mode 100644 index 00000000000..aa72159ffbb --- /dev/null +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -0,0 +1,261 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use anyhow::{bail, Context}; +use clap::{arg, ArgMatches, Command}; +use colored::Colorize; +use quickwit_config::{RetentionPolicy, SearchSettings}; +use quickwit_serve::IndexUpdates; +use tracing::debug; + +use crate::checklist::GREEN_COLOR; +use crate::ClientArgs; + +pub fn build_index_update_command() -> Command { + Command::new("update") + .display_order(2) + .about("Updates an index configuration.") + .subcommand( + Command::new("search-settings") + .about("Updates an default search settings.") + .args(&[ + arg!(--index "ID of the target index") + .display_order(1) + .required(true), + arg!(--"default-search-fields" "Comma separated list of fields that will be searched by default.") + .display_order(2) + .required(false), + ]) + .subcommand( + Command::new("retention-policy") + .about("Set or unset a retention policy.") + .args(&[ + arg!(--index "ID of the target index") + .display_order(1) + .required(true), + arg!(--"retention-policy-period" "Duration after which splits are dropped.") + .display_order(2) + .required(false), + arg!(--"retention-policy-schedule" "Frequency at which the retention policy is evaluated and applied.") + .display_order(3) + .required(false), + arg!(--"disable-retention-policy" "Disables the retention policy.") + .display_order(4) + .required(false), + ]) + ) + ) +} + +// Structured representation of the retention policy args +#[derive(Debug, Eq, PartialEq)] +pub enum RetentionPolicyUpdate { + Noop, + Update { + period: Option, + schedule: Option, + }, + Disable, +} + +impl RetentionPolicyUpdate { + pub fn apply_update( + self, + retention_policy_opt: Option, + ) -> anyhow::Result> { + match (self, retention_policy_opt) { + (Self::Noop, policy_opt) => Ok(policy_opt), + (Self::Update { period: None, .. }, None) => { + bail!("`--retention-policy-period` is required when creating a retention policy"); + } + ( + Self::Update { + period: None, + schedule, + }, + Some(mut policy), + ) => { + policy.evaluation_schedule = schedule.unwrap_or(policy.evaluation_schedule.clone()); + Ok(Some(policy)) + } + ( + Self::Update { + period: Some(period), + schedule, + }, + None, + ) => Ok(Some(RetentionPolicy { + retention_period: period, + evaluation_schedule: schedule.unwrap_or(RetentionPolicy::default_schedule()), + })), + ( + Self::Update { + period: Some(period), + schedule, + }, + Some(policy), + ) => Ok(Some(RetentionPolicy { + retention_period: period, + evaluation_schedule: schedule.unwrap_or(policy.evaluation_schedule.clone()), + })), + (Self::Disable, _) => Ok(None), + } + } +} + +#[derive(Debug, Eq, PartialEq)] +pub struct RetentionPolicyArgs { + pub client_args: ClientArgs, + pub index_id: String, + pub retention_policy_update: RetentionPolicyUpdate, +} + +#[derive(Debug, Eq, PartialEq)] +pub struct SearchSettingsArgs { + pub client_args: ClientArgs, + pub index_id: String, + pub default_search_fields: Option>, +} + +#[derive(Debug, Eq, PartialEq)] +pub enum IndexUpdateCliCommand { + RetentionPolicy(RetentionPolicyArgs), + SearchSettings(SearchSettingsArgs), +} + +impl IndexUpdateCliCommand { + pub fn parse_args(mut matches: ArgMatches) -> anyhow::Result { + let (subcommand, submatches) = matches + .remove_subcommand() + .context("failed to parse index subcommand")?; + match subcommand.as_str() { + "retention-policy" => Self::parse_update_retention_policy_args(submatches), + "search-settings" => Self::parse_update_search_settings_args(submatches), + _ => bail!("unknown index update subcommand `{subcommand}`"), + } + } + + fn parse_update_retention_policy_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let disable_retention_policy = matches.get_flag("disable-retention-policy"); + let retention_policy_period = matches.remove_one::("retention-policy-period"); + let retention_policy_schedule = matches.remove_one::("retention-policy-schedule"); + + let retention_policy_update = match ( + disable_retention_policy, + retention_policy_period, + retention_policy_schedule, + ) { + (true, Some(_), Some(_)) | (true, None, Some(_)) | (true, Some(_), None) => bail!( + "`--retention-policy-period` and `--retention-policy-schedule` cannot be used \ + together with `--disable-retention-policy`" + ), + (true, None, None) => RetentionPolicyUpdate::Disable, + (false, None, None) => RetentionPolicyUpdate::Noop, + (false, period, schedule) => RetentionPolicyUpdate::Update { period, schedule }, + }; + + Ok(Self::RetentionPolicy(RetentionPolicyArgs { + client_args, + index_id, + retention_policy_update, + })) + } + + fn parse_update_search_settings_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let default_search_fields = matches + .remove_many::("default-search-fields") + .map(|values| values.collect()); + Ok(Self::SearchSettings(SearchSettingsArgs { + client_args, + index_id, + default_search_fields, + })) + } + + pub async fn execute(self) -> anyhow::Result<()> { + match self { + Self::RetentionPolicy(args) => update_retention_policy_cli(args).await, + Self::SearchSettings(args) => update_search_settings_cli(args).await, + } + } +} + +pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-retention-policy"); + println!("❯ Updating index retention policy..."); + let qw_client = args.client_args.client(); + let metadata = qw_client.indexes().get(&args.index_id).await?; + let new_retention_policy_opt = args + .retention_policy_update + .apply_update(metadata.index_config.retention_policy_opt)?; + if let Some(new_retention_policy) = new_retention_policy_opt.as_ref() { + println!( + "New retention policy: {}", + serde_json::to_string(&new_retention_policy)? + ); + } else { + println!("Retention policy disabled."); + } + qw_client + .indexes() + .update( + &args.index_id, + IndexUpdates { + retention_policy_opt: new_retention_policy_opt, + search_settings: metadata.index_config.search_settings, + }, + ) + .await?; + Ok(()) +} + +pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-search-settings"); + println!("❯ Updating index search settings..."); + let qw_client = args.client_args.client(); + let metadata = qw_client.indexes().get(&args.index_id).await?; + let search_settings = SearchSettings { + default_search_fields: args.default_search_fields.unwrap_or(vec![]), + ..metadata.index_config.search_settings + }; + println!( + "New search settings: {}", + serde_json::to_string(&search_settings)? + ); + qw_client + .indexes() + .update( + &args.index_id, + IndexUpdates { + retention_policy_opt: metadata.index_config.retention_policy_opt, + search_settings, + }, + ) + .await?; + println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); + Ok(()) +} diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index e0807820009..2aa295e49b6 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -239,7 +239,7 @@ pub struct RetentionPolicy { } impl RetentionPolicy { - fn default_schedule() -> String { + pub fn default_schedule() -> String { "hourly".to_string() } diff --git a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs new file mode 100644 index 00000000000..5ddc8a034ec --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs @@ -0,0 +1,146 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashSet; +use std::time::Duration; + +use quickwit_config::service::QuickwitService; +use quickwit_config::SearchSettings; +use quickwit_rest_client::rest_client::CommitType; +use quickwit_serve::{IndexUpdates, SearchRequestQueryString}; +use serde_json::json; + +use crate::ingest_json; +use crate::test_utils::{ingest_with_retry, ClusterSandbox}; + +#[tokio::test] +async fn test_update_on_multi_nodes_cluster() { + quickwit_common::setup_logging_for_tests(); + let nodes_services = vec![ + HashSet::from_iter([QuickwitService::Searcher]), + HashSet::from_iter([QuickwitService::Metastore]), + HashSet::from_iter([QuickwitService::Indexer]), + HashSet::from_iter([QuickwitService::ControlPlane]), + HashSet::from_iter([QuickwitService::Janitor]), + ]; + let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + .await + .unwrap(); + sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); + + { + // Wait for indexer to fully start. + // The starting time is a bit long for a cluster. + tokio::time::sleep(Duration::from_secs(3)).await; + let indexing_service_counters = sandbox + .indexer_rest_client + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(indexing_service_counters.num_running_pipelines, 0); + } + + // Create index + sandbox + .indexer_rest_client + .indexes() + .create( + r#" + version: 0.8 + index_id: my-updatable-index + doc_mapping: + field_mappings: + - name: title + type: text + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + search_settings: + default_search_fields: [title] + "#, + quickwit_config::ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + assert!(sandbox + .indexer_rest_client + .node_health() + .is_live() + .await + .unwrap()); + + // Wait until indexing pipelines are started. + sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + + // Check that ingest request send to searcher is forwarded to indexer and thus indexed. + ingest_with_retry( + &sandbox.searcher_rest_client, + "my-updatable-index", + ingest_json!({"title": "first", "body": "first record"}), + CommitType::Auto, + ) + .await + .unwrap(); + // Wait until split is commited and search. + tokio::time::sleep(Duration::from_secs(4)).await; + // No hit because default_search_fields covers "title" only + let search_response_no_hit = sandbox + .searcher_rest_client + .search( + "my-updatable-index", + SearchRequestQueryString { + query: "record".to_string(), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(search_response_no_hit.num_hits, 0); + // Update index to also search "body" by default, search should now have 1 hit + sandbox + .searcher_rest_client + .indexes() + .update( + "my-updatable-index", + IndexUpdates { + search_settings: SearchSettings { + default_search_fields: vec!["title".to_string(), "body".to_string()], + }, + retention_policy_opt: None, + }, + ) + .await + .unwrap(); + let search_response_no_hit = sandbox + .searcher_rest_client + .search( + "my-updatable-index", + SearchRequestQueryString { + query: "record".to_string(), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(search_response_no_hit.num_hits, 1); + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-integration-tests/src/tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/mod.rs index b1546b8d92a..76322012b30 100644 --- a/quickwit/quickwit-integration-tests/src/tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/mod.rs @@ -19,3 +19,4 @@ mod basic_tests; mod index_tests; +mod index_update_tests; diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index a4a8a9c9278..e90267482a8 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -26,7 +26,9 @@ use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_search::SearchResponseRest; -use quickwit_serve::{ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString}; +use quickwit_serve::{ + IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString, +}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; @@ -352,6 +354,21 @@ impl<'a> IndexClient<'a> { Ok(index_metadata) } + pub async fn update( + &self, + index_id: &str, + index_updates: IndexUpdates, + ) -> Result { + let body = Bytes::from(serde_json::to_string(&index_updates)?); + let path = format!("indexes/{index_id}"); + let response = self + .transport + .send::<()>(Method::PUT, &path, None, None, Some(body), self.timeout) + .await?; + let index_metadata = response.deserialize().await?; + Ok(index_metadata) + } + pub async fn list(&self) -> Result, Error> { let response = self .transport diff --git a/quickwit/quickwit-serve/src/index_api/mod.rs b/quickwit/quickwit-serve/src/index_api/mod.rs index 9b0990ce1f8..ab831526c81 100644 --- a/quickwit/quickwit-serve/src/index_api/mod.rs +++ b/quickwit/quickwit-serve/src/index_api/mod.rs @@ -20,5 +20,5 @@ mod rest_handler; pub use self::rest_handler::{ - index_management_handlers, IndexApi, ListSplitsQueryParams, ListSplitsResponse, + index_management_handlers, IndexApi, IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, }; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index c14d6885cfb..5cd5b487a93 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -117,7 +117,7 @@ use tracing::{debug, error, info, warn}; use warp::{Filter, Rejection}; pub use crate::build_info::{BuildInfo, RuntimeInfo}; -pub use crate::index_api::{ListSplitsQueryParams, ListSplitsResponse}; +pub use crate::index_api::{IndexUpdates, ListSplitsQueryParams, ListSplitsResponse}; pub use crate::metrics::SERVE_METRICS; use crate::rate_modulator::RateModulator; #[cfg(test)] From ea4720d506c96f98b56f7f823515460c32864752 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:29 +0000 Subject: [PATCH 04/13] Add cli for update --- docs/reference/cli.md | 88 ++++++++++- .../quickwit-cli/src/generate_markdown.rs | 8 +- quickwit/quickwit-cli/src/index/mod.rs | 8 +- quickwit/quickwit-cli/src/index/update.rs | 142 ++++++------------ quickwit/quickwit-cli/src/main.rs | 30 ++++ quickwit/quickwit-cli/tests/cli.rs | 70 ++++++++- 6 files changed, 241 insertions(+), 105 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 6b289fb7024..ad73d7a59af 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -116,7 +116,7 @@ quickwit run | Option | Description | Default | |-----------------|-------------|--------:| | `--config` | Config file location | `config/quickwit.yaml` | -| `--service` | Services (indexer,searcher,janitor,metastore or control-plane) to run. If unspecified, all the supported services are started. | | +| `--service` | Services (`indexer`, `searcher`, `metastore`, `control-plane`, or `janitor`) to run. If unspecified, all the supported services are started. | | *Examples* @@ -141,7 +141,7 @@ curl "http://127.0.0.1:7280/api/v1/wikipedia/search?query=barack+obama" ``` ## index -Manages indexes: creates, deletes, ingests, searches, describes... +Manages indexes: creates, updates, deletes, ingests, searches, describes... ### index create @@ -180,6 +180,51 @@ quickwit index create --endpoint=http://127.0.0.1:7280 --index-config wikipedia_ ``` +### index update + +`quickwit index update [args]` +#### index update search-settings + +Updates default search settings. +`quickwit index update search-settings [args]` + +*Synopsis* + +```bash +quickwit index update search-settings + --index + [--default-search-fields ] +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--index` | ID of the target index | +| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. If not specified, default fields are removed and queries without target field will fail. Space-separated list, e.g. "field1 field2". | +#### index update retention-policy + +Configure or disable the retention policy. +`quickwit index update retention-policy [args]` + +*Synopsis* + +```bash +quickwit index update retention-policy + --index + [--period ] + [--schedule ] + [--disable] +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--index` | ID of the target index | +| `--period` | Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...) | +| `--schedule` | Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...). | +| `--disable` | Disable the retention policy. Old indexed data will not be cleaned up anymore. | ### index clear Clears an index: deletes all splits and resets checkpoint. @@ -661,8 +706,8 @@ quickwit split list | Option | Description | |-----------------|-------------| | `--index` | Target index ID | -| `--offset` | Number of splits to skip | -| `--limit` | Maximum number of splits to retrieve | +| `--offset` | Number of splits to skip. | +| `--limit` | Maximum number of splits to retrieve. | | `--states` | Selects the splits whose states are included in this comma-separated list of states. Possible values are `staged`, `published`, and `marked`. | | `--create-date` | Selects the splits whose creation dates are before this date. | | `--start-date` | Selects the splits that contain documents after this date (time-series indexes only). | @@ -742,6 +787,41 @@ quickwit tool local-ingest | `--overwrite` | Overwrites pre-existing index. | | | `--transform-script` | VRL program to transform docs before ingesting. | | | `--keep-cache` | Does not clear local cache directory upon completion. | | +### tool local-search + +Searches an index locally. +`quickwit tool local-search [args]` + +*Synopsis* + +```bash +quickwit tool local-search + --index + --query + [--aggregation ] + [--max-hits ] + [--start-offset ] + [--search-fields ] + [--snippet-fields ] + [--start-timestamp ] + [--end-timestamp ] + [--sort-by-field ] +``` + +*Options* + +| Option | Description | Default | +|-----------------|-------------|--------:| +| `--index` | ID of the target index | | +| `--query` | Query expressed in natural query language ((barack AND obama) OR "president of united states"). Learn more on https://quickwit.io/docs/reference/search-language. | | +| `--aggregation` | JSON serialized aggregation request in tantivy/elasticsearch format. | | +| `--max-hits` | Maximum number of hits returned. | `20` | +| `--start-offset` | Offset in the global result set of the first hit returned. | `0` | +| `--search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field in the query. It overrides the default search fields defined in the index config. Space-separated list, e.g. "field1 field2". | | +| `--snippet-fields` | List of fields that Quickwit will return snippet highlight on. Space-separated list, e.g. "field1 field2". | | +| `--start-timestamp` | Filters out documents before that timestamp (time-series indexes only). | | +| `--end-timestamp` | Filters out documents after that timestamp (time-series indexes only). | | +| `--sort-by-field` | Sort by field. | | ### tool extract-split Downloads and extracts a split to a directory. diff --git a/quickwit/quickwit-cli/src/generate_markdown.rs b/quickwit/quickwit-cli/src/generate_markdown.rs index fe22a753ed1..024486e4b73 100644 --- a/quickwit/quickwit-cli/src/generate_markdown.rs +++ b/quickwit/quickwit-cli/src/generate_markdown.rs @@ -43,11 +43,13 @@ fn markdown_for_subcommand( subcommand: &Command, command_group: Vec, doc_extensions: &toml::Value, + level: usize, ) { let subcommand_name = subcommand.get_name(); let command_name = format!("{} {}", command_group.join(" "), subcommand_name); - println!("### {command_name}\n"); + let header_level = "#".repeat(level); + println!("{header_level} {command_name}\n"); let subcommand_ext: Option<&Value> = { let mut val_opt: Option<&Value> = doc_extensions.get(command_group[0].to_string()); @@ -201,14 +203,14 @@ fn generate_markdown_from_clap(command: &Command) { .filter(|subcommand| !excluded_doc_commands.contains(&subcommand.get_name())) { let commands = vec![command.get_name().to_string()]; - markdown_for_subcommand(subcommand, commands, &doc_extensions); + markdown_for_subcommand(subcommand, commands, &doc_extensions, 3); for subsubcommand in subcommand.get_subcommands() { let commands = vec![ command.get_name().to_string(), subcommand.get_name().to_string(), ]; - markdown_for_subcommand(subsubcommand, commands, &doc_extensions); + markdown_for_subcommand(subsubcommand, commands, &doc_extensions, 4); } } } diff --git a/quickwit/quickwit-cli/src/index/mod.rs b/quickwit/quickwit-cli/src/index/mod.rs index 9743e98f071..f9f4846c506 100644 --- a/quickwit/quickwit-cli/src/index/mod.rs +++ b/quickwit/quickwit-cli/src/index/mod.rs @@ -59,7 +59,7 @@ use crate::checklist::GREEN_COLOR; use crate::stats::{mean, percentile, std_deviation}; use crate::{client_args, make_table, prompt_confirmation, ClientArgs, THROUGHPUT_WINDOW_SIZE}; -mod update; +pub mod update; pub fn build_index_command() -> Command { Command::new("index") @@ -78,7 +78,7 @@ pub fn build_index_command() -> Command { ]) ) .subcommand( - build_index_update_command() + build_index_update_command().display_order(2) ) .subcommand( Command::new("clear") @@ -154,7 +154,7 @@ pub fn build_index_command() -> Command { .conflicts_with("wait"), Arg::new("commit-timeout") .long("commit-timeout") - .help("Duration of the commit timeout operation.") + .help("Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting which sets the maximum time before commiting splits after their creation.") .required(false) .global(true), ]) @@ -285,7 +285,7 @@ impl IndexCliCommand { "ingest" => Self::parse_ingest_args(submatches), "list" => Self::parse_list_args(submatches), "search" => Self::parse_search_args(submatches), - "update" => Ok(Self::Update(IndexUpdateCliCommand::parse_args(matches)?)), + "update" => Ok(Self::Update(IndexUpdateCliCommand::parse_args(submatches)?)), _ => bail!("unknown index subcommand `{subcommand}`"), } } diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs index aa72159ffbb..d94a3654e66 100644 --- a/quickwit/quickwit-cli/src/index/update.rs +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -29,101 +29,46 @@ use crate::ClientArgs; pub fn build_index_update_command() -> Command { Command::new("update") - .display_order(2) - .about("Updates an index configuration.") + .subcommand_required(true) .subcommand( Command::new("search-settings") - .about("Updates an default search settings.") + .about("Updates default search settings.") .args(&[ arg!(--index "ID of the target index") .display_order(1) .required(true), - arg!(--"default-search-fields" "Comma separated list of fields that will be searched by default.") + arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. If not specified, default fields are removed and queries without target field will fail. Space-separated list, e.g. \"field1 field2\".") .display_order(2) + .num_args(1..) .required(false), - ]) + ])) .subcommand( Command::new("retention-policy") - .about("Set or unset a retention policy.") + .about("Configure or disable the retention policy.") .args(&[ arg!(--index "ID of the target index") .display_order(1) .required(true), - arg!(--"retention-policy-period" "Duration after which splits are dropped.") + arg!(--"period" "Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...)") .display_order(2) .required(false), - arg!(--"retention-policy-schedule" "Frequency at which the retention policy is evaluated and applied.") + arg!(--"schedule" "Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...).") .display_order(3) .required(false), - arg!(--"disable-retention-policy" "Disables the retention policy.") + arg!(--"disable" "Disable the retention policy. Old indexed data will not be cleaned up anymore.") .display_order(4) .required(false), ]) ) - ) -} - -// Structured representation of the retention policy args -#[derive(Debug, Eq, PartialEq)] -pub enum RetentionPolicyUpdate { - Noop, - Update { - period: Option, - schedule: Option, - }, - Disable, -} - -impl RetentionPolicyUpdate { - pub fn apply_update( - self, - retention_policy_opt: Option, - ) -> anyhow::Result> { - match (self, retention_policy_opt) { - (Self::Noop, policy_opt) => Ok(policy_opt), - (Self::Update { period: None, .. }, None) => { - bail!("`--retention-policy-period` is required when creating a retention policy"); - } - ( - Self::Update { - period: None, - schedule, - }, - Some(mut policy), - ) => { - policy.evaluation_schedule = schedule.unwrap_or(policy.evaluation_schedule.clone()); - Ok(Some(policy)) - } - ( - Self::Update { - period: Some(period), - schedule, - }, - None, - ) => Ok(Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule.unwrap_or(RetentionPolicy::default_schedule()), - })), - ( - Self::Update { - period: Some(period), - schedule, - }, - Some(policy), - ) => Ok(Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule.unwrap_or(policy.evaluation_schedule.clone()), - })), - (Self::Disable, _) => Ok(None), - } - } } #[derive(Debug, Eq, PartialEq)] pub struct RetentionPolicyArgs { pub client_args: ClientArgs, pub index_id: String, - pub retention_policy_update: RetentionPolicyUpdate, + pub disable: bool, + pub period: Option, + pub schedule: Option, } #[derive(Debug, Eq, PartialEq)] @@ -143,7 +88,7 @@ impl IndexUpdateCliCommand { pub fn parse_args(mut matches: ArgMatches) -> anyhow::Result { let (subcommand, submatches) = matches .remove_subcommand() - .context("failed to parse index subcommand")?; + .context("failed to parse index update subcommand")?; match subcommand.as_str() { "retention-policy" => Self::parse_update_retention_policy_args(submatches), "search-settings" => Self::parse_update_search_settings_args(submatches), @@ -156,28 +101,15 @@ impl IndexUpdateCliCommand { let index_id = matches .remove_one::("index") .expect("`index` should be a required arg."); - let disable_retention_policy = matches.get_flag("disable-retention-policy"); - let retention_policy_period = matches.remove_one::("retention-policy-period"); - let retention_policy_schedule = matches.remove_one::("retention-policy-schedule"); - - let retention_policy_update = match ( - disable_retention_policy, - retention_policy_period, - retention_policy_schedule, - ) { - (true, Some(_), Some(_)) | (true, None, Some(_)) | (true, Some(_), None) => bail!( - "`--retention-policy-period` and `--retention-policy-schedule` cannot be used \ - together with `--disable-retention-policy`" - ), - (true, None, None) => RetentionPolicyUpdate::Disable, - (false, None, None) => RetentionPolicyUpdate::Noop, - (false, period, schedule) => RetentionPolicyUpdate::Update { period, schedule }, - }; - + let disable = matches.get_flag("disable"); + let period = matches.remove_one::("period"); + let schedule = matches.remove_one::("schedule"); Ok(Self::RetentionPolicy(RetentionPolicyArgs { client_args, index_id, - retention_policy_update, + disable, + period, + schedule, })) } @@ -209,16 +141,40 @@ pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::R println!("❯ Updating index retention policy..."); let qw_client = args.client_args.client(); let metadata = qw_client.indexes().get(&args.index_id).await?; - let new_retention_policy_opt = args - .retention_policy_update - .apply_update(metadata.index_config.retention_policy_opt)?; + let new_retention_policy_opt = match ( + args.disable, + args.period, + args.schedule, + metadata.index_config.retention_policy_opt, + ) { + (true, Some(_), Some(_), _) | (true, None, Some(_), _) | (true, Some(_), None, _) => { + bail!("`--period` and `--schedule` cannot be used together with `--disable`") + } + (false, None, None, _) => bail!("either `--period` or `--disable` must be specified"), + (false, None, Some(_), None) => { + bail!("`--period` is required when creating a retention policy") + } + (true, None, None, _) => None, + (false, None, Some(schedule), Some(policy)) => Some(RetentionPolicy { + retention_period: policy.retention_period, + evaluation_schedule: schedule, + }), + (false, Some(period), schedule_opt, None) => Some(RetentionPolicy { + retention_period: period, + evaluation_schedule: schedule_opt.unwrap_or(RetentionPolicy::default_schedule()), + }), + (false, Some(period), schedule_opt, Some(policy)) => Some(RetentionPolicy { + retention_period: period, + evaluation_schedule: schedule_opt.unwrap_or(policy.evaluation_schedule.clone()), + }), + }; if let Some(new_retention_policy) = new_retention_policy_opt.as_ref() { println!( "New retention policy: {}", serde_json::to_string(&new_retention_policy)? ); } else { - println!("Retention policy disabled."); + println!("Disable retention policy."); } qw_client .indexes() @@ -230,6 +186,7 @@ pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::R }, ) .await?; + println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); Ok(()) } @@ -239,8 +196,7 @@ pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Res let qw_client = args.client_args.client(); let metadata = qw_client.indexes().get(&args.index_id).await?; let search_settings = SearchSettings { - default_search_fields: args.default_search_fields.unwrap_or(vec![]), - ..metadata.index_config.search_settings + default_search_fields: args.default_search_fields.unwrap_or_default(), }; println!( "New search settings: {}", diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 0c94edf51ab..0a0e7e2de91 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -101,6 +101,7 @@ mod tests { use bytesize::ByteSize; use quickwit_cli::cli::{build_cli, CliCommand}; + use quickwit_cli::index::update::{IndexUpdateCliCommand, RetentionPolicyArgs}; use quickwit_cli::index::{ ClearIndexArgs, CreateIndexArgs, DeleteIndexArgs, DescribeIndexArgs, IndexCliCommand, IngestDocsArgs, SearchIndexArgs, @@ -188,6 +189,35 @@ mod tests { Ok(()) } + #[test] + fn test_cmd_update_subsubcommand() { + let app = build_cli().no_binary_name(true); + let matches = app + .try_get_matches_from([ + "index", + "update", + "retention-policy", + "--index", + "my-index", + "--period", + "1 day", + ]) + .unwrap(); + let command = CliCommand::parse_cli_args(matches).unwrap(); + assert!(matches!( + command, + CliCommand::Index(IndexCliCommand::Update( + IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { + client_args: _, + index_id, + disable: false, + period: Some(period), + schedule: None, + }) + )) if &index_id == "my-index" && &period == "1 day" + )); + } + #[test] fn test_parse_ingest_v2_args() { let app = build_cli().no_binary_name(true); diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index fa8690dd140..588e97604ba 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -29,6 +29,7 @@ use clap::error::ErrorKind; use helpers::{TestEnv, TestStorageType}; use quickwit_cli::checklist::ChecklistError; use quickwit_cli::cli::build_cli; +use quickwit_cli::index::update::{update_retention_policy_cli, RetentionPolicyArgs}; use quickwit_cli::index::{ create_index_cli, delete_index_cli, search_index, CreateIndexArgs, DeleteIndexArgs, SearchIndexArgs, @@ -40,7 +41,7 @@ use quickwit_cli::ClientArgs; use quickwit_common::fs::get_cache_directory_path; use quickwit_common::rand::append_random_suffix; use quickwit_common::uri::Uri; -use quickwit_config::{SourceInputFormat, CLI_SOURCE_ID}; +use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID}; use quickwit_metastore::{ ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, StageSplitsRequestExt, @@ -533,6 +534,73 @@ async fn test_search_index_cli() { assert_eq!(search_res.num_hits, 0); } +#[tokio::test] +async fn test_cmd_update_index() { + quickwit_common::setup_logging_for_tests(); + let index_id = append_random_suffix("test-update-cmd"); + let test_env = create_test_env(index_id.clone(), TestStorageType::LocalFileSystem) + .await + .unwrap(); + test_env.start_server().await.unwrap(); + create_logs_index(&test_env).await.unwrap(); + + local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + .await + .unwrap(); + + // add a policy + update_retention_policy_cli(RetentionPolicyArgs { + index_id: index_id.clone(), + client_args: ClientArgs { + cluster_endpoint: test_env.cluster_endpoint.clone(), + ..Default::default() + }, + disable: false, + period: Some(String::from("1 week")), + schedule: Some(String::from("daily")), + }) + .await + .unwrap(); + let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!( + index_metadata.index_config.retention_policy_opt, + Some(RetentionPolicy { + retention_period: String::from("1 week"), + evaluation_schedule: String::from("daily") + }) + ); + + // invalid args + update_retention_policy_cli(RetentionPolicyArgs { + index_id: index_id.clone(), + client_args: ClientArgs { + cluster_endpoint: test_env.cluster_endpoint.clone(), + ..Default::default() + }, + disable: true, + period: Some(String::from("a week")), + schedule: Some(String::from("daily")), + }) + .await + .unwrap_err(); + + // remove the policy + update_retention_policy_cli(RetentionPolicyArgs { + index_id, + client_args: ClientArgs { + cluster_endpoint: test_env.cluster_endpoint.clone(), + ..Default::default() + }, + disable: true, + period: None, + schedule: None, + }) + .await + .unwrap(); + let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!(index_metadata.index_config.retention_policy_opt, None); +} + #[tokio::test] async fn test_delete_index_cli_dry_run() { quickwit_common::setup_logging_for_tests(); From 43eeec2a3792bc132fb2617d9d38f3de74d117f3 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:30 +0000 Subject: [PATCH 05/13] Improve index config docs --- docs/configuration/index-config.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index 5a3483f2827..dfd2428add8 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -14,7 +14,7 @@ In addition to the `index_id`, the index configuration lets you define five item - The **search settings**: it defines the default search fields `default_search_fields`, a list of fields that Quickwit will search into if the user query does not explicitly target a field. - The **retention policy**: it defines how long Quickwit should keep the indexed data. If not specified, the data is stored forever. -In general, configuration is set at index creation and cannot be modified. Starting Quickwit 0.9, the search setttings and retention policy can be changed using the update endpoint. +In general, configuration is set at index creation and cannot be modified. Some specific subsets like the search settings and retention policy can be changed using the [update endpoint](../reference/rest-api.md) or the [CLI](../reference/cli.md). ## Config file format From aa85e5d8ed705d29841b35fb330fc90f28f3808f Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:30 +0000 Subject: [PATCH 06/13] Fix openapi spec --- .../src/index_api/rest_handler.rs | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index db837ddc312..33f2c1e87fe 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -55,6 +55,7 @@ use crate::with_arg; #[openapi( paths( create_index, + update_index, clear_index, delete_index, list_indexes_metadata, @@ -66,7 +67,7 @@ use crate::with_arg; toggle_source, delete_source, ), - components(schemas(ToggleSource, SplitsForDeletion, IndexStats)) + components(schemas(ToggleSource, SplitsForDeletion, IndexStats, IndexUpdates)) )] pub struct IndexApi; @@ -525,14 +526,14 @@ async fn create_index( .await } -/// The body of the index update request -/// -/// Remove #[serde(deny_unknown_fields)] when adding new fields to allow to ensure forward -/// compatibility. +/// The body of the index update request. All fields will be replaced in the +/// existing configuration. #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] -#[serde(deny_unknown_fields)] +#[serde(deny_unknown_fields)] // Remove when adding new fields to allow to ensure forward compatibility pub struct IndexUpdates { pub search_settings: SearchSettings, + /// The + #[serde(rename = "retention_policy")] pub retention_policy_opt: Option, } @@ -553,14 +554,19 @@ fn update_index_handler( put, tag = "Indexes", path = "/indexes/{index_id}", - request_body = UpdateIndexRequest, + request_body = IndexUpdates, responses( - (status = 200, description = "Successfully marked splits for deletion.") + (status = 200, description = "Successfully updated the index configuration.") ), params( ("index_id" = String, Path, description = "The index ID to update."), ) )] +/// Updates an existing index. +/// +/// This endpoint has PUT semantics, which means that all the updatable fields of the index +/// configuration are replaced by the values specified in the request. In particular, omitting an +/// optional field like `retention_policy` will delete the associated configuration. async fn update_index( index_id: String, request: IndexUpdates, @@ -1775,7 +1781,7 @@ mod tests { #[tokio::test] async fn test_update_index() { - let metastore = metastore_for_test(); + let mut metastore = metastore_for_test(); let index_service = IndexService::new(metastore.clone(), StorageResolver::unconfigured()); let mut node_config = NodeConfig::for_test(); node_config.default_index_root_uri = Uri::for_test("file:///default-index-root-uri"); @@ -1819,6 +1825,20 @@ mod tests { }); assert_json_include!(actual: resp_json, expected: expected_response_json); } + // check that the metastore was updated + let index_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string())) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + index_metadata + .index_config + .search_settings + .default_search_fields, + ["severity_text", "body"] + ); } #[tokio::test] From 0a4df1f89dd4a31376f803d260093715f2888ffd Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:30 +0000 Subject: [PATCH 07/13] Re-hide local-search command --- docs/reference/cli.md | 35 ------------------- .../quickwit-cli/src/generate_markdown.rs | 2 +- 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index ad73d7a59af..a870310059a 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -787,41 +787,6 @@ quickwit tool local-ingest | `--overwrite` | Overwrites pre-existing index. | | | `--transform-script` | VRL program to transform docs before ingesting. | | | `--keep-cache` | Does not clear local cache directory upon completion. | | -### tool local-search - -Searches an index locally. -`quickwit tool local-search [args]` - -*Synopsis* - -```bash -quickwit tool local-search - --index - --query - [--aggregation ] - [--max-hits ] - [--start-offset ] - [--search-fields ] - [--snippet-fields ] - [--start-timestamp ] - [--end-timestamp ] - [--sort-by-field ] -``` - -*Options* - -| Option | Description | Default | -|-----------------|-------------|--------:| -| `--index` | ID of the target index | | -| `--query` | Query expressed in natural query language ((barack AND obama) OR "president of united states"). Learn more on https://quickwit.io/docs/reference/search-language. | | -| `--aggregation` | JSON serialized aggregation request in tantivy/elasticsearch format. | | -| `--max-hits` | Maximum number of hits returned. | `20` | -| `--start-offset` | Offset in the global result set of the first hit returned. | `0` | -| `--search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field in the query. It overrides the default search fields defined in the index config. Space-separated list, e.g. "field1 field2". | | -| `--snippet-fields` | List of fields that Quickwit will return snippet highlight on. Space-separated list, e.g. "field1 field2". | | -| `--start-timestamp` | Filters out documents before that timestamp (time-series indexes only). | | -| `--end-timestamp` | Filters out documents after that timestamp (time-series indexes only). | | -| `--sort-by-field` | Sort by field. | | ### tool extract-split Downloads and extracts a split to a directory. diff --git a/quickwit/quickwit-cli/src/generate_markdown.rs b/quickwit/quickwit-cli/src/generate_markdown.rs index 024486e4b73..03ea5bf5cbb 100644 --- a/quickwit/quickwit-cli/src/generate_markdown.rs +++ b/quickwit/quickwit-cli/src/generate_markdown.rs @@ -197,7 +197,7 @@ fn generate_markdown_from_clap(command: &Command) { continue; } - let excluded_doc_commands = ["merge"]; + let excluded_doc_commands = ["merge", "local-search"]; for subcommand in command .get_subcommands() .filter(|subcommand| !excluded_doc_commands.contains(&subcommand.get_name())) From b2a92047fa144362f1140594b4fb89740062a854 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:31 +0000 Subject: [PATCH 08/13] Fix and improve update endpoint docs --- docs/reference/rest-api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index 03125de3c47..c3319893027 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -315,7 +315,7 @@ The response is the index metadata of the created index, and the content type is PUT api/v1/indexes/ ``` -Update an index with the updatables parts of the `IndexConfig` payload. Note that this follows the PUT semantics and not PATCH, so all the fields must be specified and Unlike the create endpoint, this API accepts JSON only. +Update an index with the updatable subset of the `IndexConfig` payload. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. #### PUT payload From 6b465284e57e1444158f260789fdf305817c73ef Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:31 +0000 Subject: [PATCH 09/13] Small adjustments --- quickwit/quickwit-cli/tests/cli.rs | 4 ---- .../metastore/file_backed/file_backed_index/mod.rs | 8 ++++---- .../src/metastore/file_backed/mod.rs | 14 +++++++------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 588e97604ba..848934c6a28 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -544,10 +544,6 @@ async fn test_cmd_update_index() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) - .await - .unwrap(); - // add a policy update_retention_policy_cli(RetentionPolicyArgs { index_id: index_id.clone(), diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index c2b35f08e76..99d8221d36c 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,7 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_config::{SourceConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{IndexConfig, SourceConfig, INGEST_V2_SOURCE_ID}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, @@ -213,9 +213,9 @@ impl FileBackedIndex { &self.metadata } - /// Mutable ref to index metadata. - pub fn metadata_mut(&mut self) -> &mut IndexMetadata { - &mut self.metadata + /// Mutable ref to index config. + pub fn index_config_mut(&mut self) -> &mut IndexConfig { + &mut self.metadata.index_config } /// Stages a single split. diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 035d916a33c..bc58cd15d43 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -467,15 +467,15 @@ impl MetastoreService for FileBackedMetastore { let metadata = self .mutate(index_uid, |index| { - let metadata = index.metadata_mut(); - if metadata.index_config.search_settings != search_settings - || metadata.index_config.retention_policy_opt != retention_policy_opt + let index_config = index.index_config_mut(); + if index_config.search_settings != search_settings + || index_config.retention_policy_opt != retention_policy_opt { - metadata.index_config.search_settings = search_settings; - metadata.index_config.retention_policy_opt = retention_policy_opt; - Ok(MutationOccurred::Yes(metadata.clone())) + index_config.search_settings = search_settings; + index_config.retention_policy_opt = retention_policy_opt; + Ok(MutationOccurred::Yes(index.metadata().clone())) } else { - Ok(MutationOccurred::No(metadata.clone())) + Ok(MutationOccurred::No(index.metadata().clone())) } }) .await?; From 1ee8112121f5becbf82e20a01d483828d242f399 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:31 +0000 Subject: [PATCH 10/13] Make update search-settings CLI more forward compatible --- docs/reference/cli.md | 4 ++-- quickwit/quickwit-cli/src/index/update.rs | 15 +++++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index a870310059a..651e162013d 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -193,7 +193,7 @@ Updates default search settings. ```bash quickwit index update search-settings --index - [--default-search-fields ] + --default-search-fields ``` *Options* @@ -201,7 +201,7 @@ quickwit index update search-settings | Option | Description | |-----------------|-------------| | `--index` | ID of the target index | -| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. If not specified, default fields are removed and queries without target field will fail. Space-separated list, e.g. "field1 field2". | +| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. "field1 field2". If no value is provided, existing defaults are removed and queries without target field will fail. | #### index update retention-policy Configure or disable the retention policy. diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs index d94a3654e66..7482b85bb31 100644 --- a/quickwit/quickwit-cli/src/index/update.rs +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -37,10 +37,10 @@ pub fn build_index_update_command() -> Command { arg!(--index "ID of the target index") .display_order(1) .required(true), - arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. If not specified, default fields are removed and queries without target field will fail. Space-separated list, e.g. \"field1 field2\".") + arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. \"field1 field2\". If no value is provided, existing defaults are removed and queries without target field will fail.") .display_order(2) - .num_args(1..) - .required(false), + .num_args(0..) + .required(true), ])) .subcommand( Command::new("retention-policy") @@ -75,7 +75,7 @@ pub struct RetentionPolicyArgs { pub struct SearchSettingsArgs { pub client_args: ClientArgs, pub index_id: String, - pub default_search_fields: Option>, + pub default_search_fields: Vec, } #[derive(Debug, Eq, PartialEq)] @@ -120,7 +120,10 @@ impl IndexUpdateCliCommand { .expect("`index` should be a required arg."); let default_search_fields = matches .remove_many::("default-search-fields") - .map(|values| values.collect()); + .map(|values| values.collect()) + // --default-search-fields should be made optional if other fields + // are added to SearchSettings + .expect("`default-search-fields` should be a required arg."); Ok(Self::SearchSettings(SearchSettingsArgs { client_args, index_id, @@ -196,7 +199,7 @@ pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Res let qw_client = args.client_args.client(); let metadata = qw_client.indexes().get(&args.index_id).await?; let search_settings = SearchSettings { - default_search_fields: args.default_search_fields.unwrap_or_default(), + default_search_fields: args.default_search_fields, }; println!( "New search settings: {}", From 41af001914bd74fca15941d2b9be1eb2329db104 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:31 +0000 Subject: [PATCH 11/13] Address all small review comments --- docs/configuration/index-config.md | 2 +- docs/reference/rest-api.md | 14 ++++++- quickwit/quickwit-cli/src/index/mod.rs | 2 +- quickwit/quickwit-cli/src/index/update.rs | 40 ++++++++++++++++++- quickwit/quickwit-cli/src/main.rs | 30 -------------- .../file_backed/file_backed_index/mod.rs | 17 ++++++-- .../src/metastore/file_backed/mod.rs | 9 ++--- .../quickwit-metastore/src/metastore/mod.rs | 18 ++++----- .../quickwit-metastore/src/tests/index.rs | 26 ++++++++---- .../src/index_api/rest_handler.rs | 1 - 10 files changed, 95 insertions(+), 64 deletions(-) diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index dfd2428add8..31097e2e597 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -14,7 +14,7 @@ In addition to the `index_id`, the index configuration lets you define five item - The **search settings**: it defines the default search fields `default_search_fields`, a list of fields that Quickwit will search into if the user query does not explicitly target a field. - The **retention policy**: it defines how long Quickwit should keep the indexed data. If not specified, the data is stored forever. -In general, configuration is set at index creation and cannot be modified. Some specific subsets like the search settings and retention policy can be changed using the [update endpoint](../reference/rest-api.md) or the [CLI](../reference/cli.md). +Configuration is generally set at index creation and cannot be modified, except for some specific attributes like the search settings and retention policy, which can be changed using the [update endpoint](../reference/rest-api.md) or the [CLI](../reference/cli.md). ## Config file format diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index c3319893027..0408851d73d 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -309,13 +309,13 @@ The response is the index metadata of the created index, and the content type is | `sources` | List of the index sources configurations. | `Array` | -### Update an index +### Update an index (search settings and retention policy only) ``` PUT api/v1/indexes/ ``` -Update an index with the updatable subset of the `IndexConfig` payload. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. +Updates the search settings and retention policy of an index. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. #### PUT payload @@ -341,6 +341,16 @@ curl -XPUT http://0.0.0.0:8080/api/v1/indexes --data @index_update.json -H "Cont } ``` +:::warning +Calling the update endpoint with the following payload will remove the current retention policy. +```json +{ + "search_settings": { + "default_search_fields": ["body"] + } +} +``` + #### Response The response is the index metadata of the updated index, and the content type is `application/json; charset=UTF-8.` diff --git a/quickwit/quickwit-cli/src/index/mod.rs b/quickwit/quickwit-cli/src/index/mod.rs index f9f4846c506..5102119bbe3 100644 --- a/quickwit/quickwit-cli/src/index/mod.rs +++ b/quickwit/quickwit-cli/src/index/mod.rs @@ -154,7 +154,7 @@ pub fn build_index_command() -> Command { .conflicts_with("wait"), Arg::new("commit-timeout") .long("commit-timeout") - .help("Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting which sets the maximum time before commiting splits after their creation.") + .help("Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before commiting splits after their creation.") .required(false) .global(true), ]) diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs index 7482b85bb31..c79f30357ea 100644 --- a/quickwit/quickwit-cli/src/index/update.rs +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -44,7 +44,7 @@ pub fn build_index_update_command() -> Command { ])) .subcommand( Command::new("retention-policy") - .about("Configure or disable the retention policy.") + .about("Configures or disables the retention policy.") .args(&[ arg!(--index "ID of the target index") .display_order(1) @@ -55,7 +55,7 @@ pub fn build_index_update_command() -> Command { arg!(--"schedule" "Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...).") .display_order(3) .required(false), - arg!(--"disable" "Disable the retention policy. Old indexed data will not be cleaned up anymore.") + arg!(--"disable" "Disables the retention policy. Old indexed data will not be cleaned up anymore.") .display_order(4) .required(false), ]) @@ -218,3 +218,39 @@ pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Res println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); Ok(()) } + +#[cfg(test)] +mod test { + use super::*; + use crate::cli::{build_cli, CliCommand}; + use crate::index::IndexCliCommand; + + #[test] + fn test_cmd_update_subsubcommand() { + let app = build_cli().no_binary_name(true); + let matches = app + .try_get_matches_from([ + "index", + "update", + "retention-policy", + "--index", + "my-index", + "--period", + "1 day", + ]) + .unwrap(); + let command = CliCommand::parse_cli_args(matches).unwrap(); + assert!(matches!( + command, + CliCommand::Index(IndexCliCommand::Update( + IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { + client_args: _, + index_id, + disable: false, + period: Some(period), + schedule: None, + }) + )) if &index_id == "my-index" && &period == "1 day" + )); + } +} diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 0a0e7e2de91..0c94edf51ab 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -101,7 +101,6 @@ mod tests { use bytesize::ByteSize; use quickwit_cli::cli::{build_cli, CliCommand}; - use quickwit_cli::index::update::{IndexUpdateCliCommand, RetentionPolicyArgs}; use quickwit_cli::index::{ ClearIndexArgs, CreateIndexArgs, DeleteIndexArgs, DescribeIndexArgs, IndexCliCommand, IngestDocsArgs, SearchIndexArgs, @@ -189,35 +188,6 @@ mod tests { Ok(()) } - #[test] - fn test_cmd_update_subsubcommand() { - let app = build_cli().no_binary_name(true); - let matches = app - .try_get_matches_from([ - "index", - "update", - "retention-policy", - "--index", - "my-index", - "--period", - "1 day", - ]) - .unwrap(); - let command = CliCommand::parse_cli_args(matches).unwrap(); - assert!(matches!( - command, - CliCommand::Index(IndexCliCommand::Update( - IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { - client_args: _, - index_id, - disable: false, - period: Some(period), - schedule: None, - }) - )) if &index_id == "my-index" && &period == "1 day" - )); - } - #[test] fn test_parse_ingest_v2_args() { let app = build_cli().no_binary_name(true); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 99d8221d36c..543af408902 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,7 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_config::{IndexConfig, SourceConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, @@ -213,9 +213,18 @@ impl FileBackedIndex { &self.metadata } - /// Mutable ref to index config. - pub fn index_config_mut(&mut self) -> &mut IndexConfig { - &mut self.metadata.index_config + /// Replace the search settings in the index config, returning whether a mutation occurred. + pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { + let is_mutation = self.metadata.index_config.search_settings != search_settings; + self.metadata.index_config.search_settings = search_settings; + is_mutation + } + + /// Replace the retention policy in the index config, returning whether a mutation occurred. + pub fn set_retention_policy(&mut self, retention_policy_opt: Option) -> bool { + let is_mutation = self.metadata.index_config.retention_policy_opt != retention_policy_opt; + self.metadata.index_config.retention_policy_opt = retention_policy_opt; + is_mutation } /// Stages a single split. diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index bc58cd15d43..f937b2b1e93 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -467,12 +467,9 @@ impl MetastoreService for FileBackedMetastore { let metadata = self .mutate(index_uid, |index| { - let index_config = index.index_config_mut(); - if index_config.search_settings != search_settings - || index_config.retention_policy_opt != retention_policy_opt - { - index_config.search_settings = search_settings; - index_config.retention_policy_opt = retention_policy_opt; + let search_settings_mutated = index.set_search_settings(search_settings); + let retention_policy_mutated = index.set_retention_policy(retention_policy_opt); + if search_settings_mutated || retention_policy_mutated { Ok(MutationOccurred::Yes(index.metadata().clone())) } else { Ok(MutationOccurred::No(index.metadata().clone())) diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 671748f0a0e..886710e7033 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -204,11 +204,10 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { retention_policy_opt: &Option, ) -> MetastoreResult { let search_settings_json = serde_utils::to_json_str(&search_settings)?; - let retention_policy_json = if let Some(retention_policy) = &retention_policy_opt { - Some(serde_utils::to_json_str(retention_policy)?) - } else { - None - }; + let retention_policy_json = retention_policy_opt + .as_ref() + .map(serde_utils::to_json_str) + .transpose()?; let update_request = UpdateIndexRequest { index_uid: Some(index_uid.into()), @@ -223,11 +222,10 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { } fn deserialize_retention_policy(&self) -> MetastoreResult> { - if let Some(retention_policy_json) = &self.retention_policy_json { - serde_utils::from_json_str(retention_policy_json).map(Some) - } else { - Ok(None) - } + self.retention_policy_json + .as_ref() + .map(|policy| serde_utils::from_json_str(policy)) + .transpose() } } diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index f5e00718906..a75ff115297 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -25,12 +25,13 @@ // - list_indexes // - delete_index -use std::vec; +use std::collections::BTreeSet; use quickwit_common::rand::append_random_suffix; use quickwit_config::{ IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, }; +use quickwit_doc_mapper::FieldMappingType; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, StageSplitsRequest, @@ -107,13 +108,24 @@ pub async fn test_metastore_update_index< .deserialize_index_metadata() .unwrap(); + // use all fields that are currently not set as default + let current_defaults = BTreeSet::from_iter( + index_metadata + .index_config + .search_settings + .default_search_fields, + ); let new_search_setting = SearchSettings { - default_search_fields: vec!["body".to_string(), "owner".to_string()], + default_search_fields: index_metadata + .index_config + .doc_mapping + .field_mappings + .iter() + .filter(|f| matches!(f.mapping_type, FieldMappingType::Text(..))) + .filter(|f| !current_defaults.contains(&f.name)) + .map(|f| f.name.clone()) + .collect(), }; - assert_ne!( - index_metadata.index_config.search_settings, new_search_setting, - "original and updated value are the same, test became inefficient" - ); let new_retention_policy_opt = Some(RetentionPolicy { retention_period: String::from("3 days"), @@ -124,7 +136,7 @@ pub async fn test_metastore_update_index< "original and updated value are the same, test became inefficient" ); - // run same update twice to check indempotence, then None as a corner case check + // run same update twice to check idempotence, then None as a corner case check for loop_retention_policy_opt in [ new_retention_policy_opt.clone(), new_retention_policy_opt.clone(), diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 33f2c1e87fe..a64ff918540 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -532,7 +532,6 @@ async fn create_index( #[serde(deny_unknown_fields)] // Remove when adding new fields to allow to ensure forward compatibility pub struct IndexUpdates { pub search_settings: SearchSettings, - /// The #[serde(rename = "retention_policy")] pub retention_policy_opt: Option, } From d5bde068d3a880b1384b244d1352855896667439 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:32 +0000 Subject: [PATCH 12/13] Refactor mutate_index_metadata to use MutationOccured --- .../file_backed/file_backed_index/mod.rs | 2 +- .../src/metastore/file_backed/mod.rs | 5 +- .../src/metastore/index_metadata/mod.rs | 6 +- .../src/metastore/postgres/metastore.rs | 70 ++++++++++--------- 4 files changed, 44 insertions(+), 39 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 543af408902..a7d802997b2 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -506,7 +506,7 @@ impl FileBackedIndex { } /// Deletes the source. Returns whether a mutation occurred. - pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult { + pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<()> { self.metadata.delete_source(source_id) } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index f937b2b1e93..a0ec0f51264 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -663,9 +663,8 @@ impl MetastoreService for FileBackedMetastore { let index_uid = request.index_uid(); self.mutate(index_uid, |index| { - index - .delete_source(&request.source_id) - .map(MutationOccurred::from) + index.delete_source(&request.source_id)?; + Ok(MutationOccurred::Yes(())) }) .await?; Ok(EmptyResponse {}) diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index b07557a0748..501e487fa95 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -126,8 +126,8 @@ impl IndexMetadata { Ok(mutation_occurred) } - /// Deletes a source from the index. Returns whether the index was modified (true). - pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult { + /// Deletes a source from the index. + pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<()> { self.sources.remove(source_id).ok_or_else(|| { MetastoreError::NotFound(EntityKind::Source { index_id: self.index_id().to_string(), @@ -135,7 +135,7 @@ impl IndexMetadata { }) })?; self.checkpoint.remove_source(source_id); - Ok(true) + Ok(()) } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c2132c2fdad..cf97c8b6869 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -60,6 +60,7 @@ use super::utils::{append_query_filters, establish_connection}; use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, }; +use crate::file_backed::MutationOccurred; use crate::metastore::postgres::utils::split_maturity_timestamp; use crate::metastore::{PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE}; use crate::{ @@ -288,26 +289,29 @@ macro_rules! run_with_tx { }}; } -async fn mutate_index_metadata Result>( +async fn mutate_index_metadata( tx: &mut Transaction<'_, Postgres>, index_uid: IndexUid, mutate_fn: M, -) -> MetastoreResult +) -> MetastoreResult where MetastoreError: From, + M: FnOnce(IndexMetadata) -> Result, E>, { let index_id = &index_uid.index_id; - let mut index_metadata = index_metadata(tx, index_id).await?; + let index_metadata = index_metadata(tx, index_id).await?; if index_metadata.index_uid != index_uid { return Err(MetastoreError::NotFound(EntityKind::Index { index_id: index_id.to_string(), })); } - let mutation_occurred = mutate_fn(&mut index_metadata)?; - if !mutation_occurred { - return Ok(mutation_occurred); - } - let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| { + + let mutated_index_metadata = match mutate_fn(index_metadata)? { + MutationOccurred::Yes(index_metadata) => index_metadata, + MutationOccurred::No(index_metadata) => return Ok(index_metadata), + }; + + let index_metadata_json = serde_json::to_string(&mutated_index_metadata).map_err(|error| { MetastoreError::JsonSerializeError { struct_name: "IndexMetadata".to_string(), message: error.to_string(), @@ -329,7 +333,7 @@ where index_id: index_id.to_string(), })); } - Ok(mutation_occurred) + Ok(mutated_index_metadata) } #[async_trait] @@ -406,32 +410,25 @@ impl MetastoreService for PostgresqlMetastore { let retention_policy_opt = request.deserialize_retention_policy()?; let search_settings = request.deserialize_search_settings()?; let index_uid: IndexUid = request.index_uid().clone(); - let mut mutated_metadata_opt = None; - let mutated_metadata_ref = &mut mutated_metadata_opt; - run_with_tx!(self.connection_pool, tx, { + let updated_metadata = run_with_tx!(self.connection_pool, tx, { mutate_index_metadata::( tx, index_uid, - |index_metadata: &mut IndexMetadata| { - let mutated = if index_metadata.index_config.search_settings != search_settings + |mut index_metadata: IndexMetadata| { + if index_metadata.index_config.search_settings != search_settings || index_metadata.index_config.retention_policy_opt != retention_policy_opt { index_metadata.index_config.search_settings = search_settings; index_metadata.index_config.retention_policy_opt = retention_policy_opt; - true + Ok(MutationOccurred::Yes(index_metadata)) } else { - false - }; - *mutated_metadata_ref = Some(index_metadata.clone()); - Ok(mutated) + Ok(MutationOccurred::No(index_metadata)) + } }, ) - .await?; - Ok(()) + .await })?; - let mutated_metadata = - mutated_metadata_opt.expect("Mutated IndexMetadata should be set by transaction"); - IndexMetadataResponse::try_from_index_metadata(&mutated_metadata) + IndexMetadataResponse::try_from_index_metadata(&updated_metadata) } #[instrument(skip_all, fields(index_id=%request.index_uid()))] @@ -975,9 +972,9 @@ impl MetastoreService for PostgresqlMetastore { mutate_index_metadata::( tx, index_uid, - |index_metadata: &mut IndexMetadata| { + |mut index_metadata: IndexMetadata| { index_metadata.add_source(source_config)?; - Ok(true) + Ok(MutationOccurred::Yes(index_metadata)) }, ) .await?; @@ -993,8 +990,12 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata(tx, index_uid, |index_metadata| { - index_metadata.toggle_source(&request.source_id, request.enable) + mutate_index_metadata(tx, index_uid, |mut index_metadata| { + if index_metadata.toggle_source(&request.source_id, request.enable)? { + Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata)) + } else { + Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata)) + } }) .await?; Ok(()) @@ -1010,8 +1011,9 @@ impl MetastoreService for PostgresqlMetastore { let index_uid: IndexUid = request.index_uid().clone(); let source_id = request.source_id.clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata(tx, index_uid.clone(), |index_metadata| { - index_metadata.delete_source(&source_id) + mutate_index_metadata(tx, index_uid.clone(), |mut index_metadata| { + index_metadata.delete_source(&source_id)?; + Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata)) }) .await?; sqlx::query( @@ -1038,8 +1040,12 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata(tx, index_uid, |index_metadata| { - Ok::<_, MetastoreError>(index_metadata.checkpoint.reset_source(&request.source_id)) + mutate_index_metadata(tx, index_uid, |mut index_metadata| { + if index_metadata.checkpoint.reset_source(&request.source_id) { + Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata)) + } else { + Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata)) + } }) .await?; Ok(()) From d68309e50a595b8bad38b88e68ee384e2b5f4124 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 15 Apr 2024 07:17:32 +0000 Subject: [PATCH 13/13] Apply new round of PR comments --- .../file_backed/file_backed_index/mod.rs | 4 +- .../quickwit-metastore/src/metastore/mod.rs | 10 +-- .../src/metastore/postgres/metastore.rs | 69 ++++++++----------- 3 files changed, 37 insertions(+), 46 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index a7d802997b2..5f7f91c81e0 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -213,14 +213,14 @@ impl FileBackedIndex { &self.metadata } - /// Replace the search settings in the index config, returning whether a mutation occurred. + /// Replaces the search settings in the index config, returning whether a mutation occurred. pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { let is_mutation = self.metadata.index_config.search_settings != search_settings; self.metadata.index_config.search_settings = search_settings; is_mutation } - /// Replace the retention policy in the index config, returning whether a mutation occurred. + /// Replaces the retention policy in the index config, returning whether a mutation occurred. pub fn set_retention_policy(&mut self, retention_policy_opt: Option) -> bool { let is_mutation = self.metadata.index_config.retention_policy_opt != retention_policy_opt; self.metadata.index_config.retention_policy_opt = retention_policy_opt; diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 886710e7033..3a231f0029c 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -181,19 +181,19 @@ impl CreateIndexResponseExt for CreateIndexResponse { /// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. pub trait UpdateIndexRequestExt { - /// Updates a new [`UpdateIndexRequest`] from an [`IndexConfig`]. + /// Creates a new [`UpdateIndexRequest`] from the different updated fields. fn try_from_updates( index_uid: impl Into, search_settings: &SearchSettings, retention_policy_opt: &Option, ) -> MetastoreResult; - /// Deserializes the `index_config_json` field of a [`UpdateIndexRequest`] into an - /// [`IndexConfig`]. + /// Deserializes the `search_settings_json` field of an [`UpdateIndexRequest`] into a + /// [`SearchSettings`] object. fn deserialize_search_settings(&self) -> MetastoreResult; - /// Deserializes the `source_configs_json` field of a [`UpdateIndexRequest`] into an - /// `Vec` of [`SourceConfig`]. + /// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a + /// [`RetentionPolicy`] object. fn deserialize_retention_policy(&self) -> MetastoreResult>; } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index cf97c8b6869..9202c24cbea 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -296,22 +296,21 @@ async fn mutate_index_metadata( ) -> MetastoreResult where MetastoreError: From, - M: FnOnce(IndexMetadata) -> Result, E>, + M: FnOnce(&mut IndexMetadata) -> Result, E>, { let index_id = &index_uid.index_id; - let index_metadata = index_metadata(tx, index_id).await?; + let mut index_metadata = index_metadata(tx, index_id).await?; if index_metadata.index_uid != index_uid { return Err(MetastoreError::NotFound(EntityKind::Index { index_id: index_id.to_string(), })); } - let mutated_index_metadata = match mutate_fn(index_metadata)? { - MutationOccurred::Yes(index_metadata) => index_metadata, - MutationOccurred::No(index_metadata) => return Ok(index_metadata), - }; + if let MutationOccurred::No(()) = mutate_fn(&mut index_metadata)? { + return Ok(index_metadata); + } - let index_metadata_json = serde_json::to_string(&mutated_index_metadata).map_err(|error| { + let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| { MetastoreError::JsonSerializeError { struct_name: "IndexMetadata".to_string(), message: error.to_string(), @@ -333,7 +332,7 @@ where index_id: index_id.to_string(), })); } - Ok(mutated_index_metadata) + Ok(index_metadata) } #[async_trait] @@ -411,21 +410,17 @@ impl MetastoreService for PostgresqlMetastore { let search_settings = request.deserialize_search_settings()?; let index_uid: IndexUid = request.index_uid().clone(); let updated_metadata = run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata::( - tx, - index_uid, - |mut index_metadata: IndexMetadata| { - if index_metadata.index_config.search_settings != search_settings - || index_metadata.index_config.retention_policy_opt != retention_policy_opt - { - index_metadata.index_config.search_settings = search_settings; - index_metadata.index_config.retention_policy_opt = retention_policy_opt; - Ok(MutationOccurred::Yes(index_metadata)) - } else { - Ok(MutationOccurred::No(index_metadata)) - } - }, - ) + mutate_index_metadata::(tx, index_uid, |index_metadata| { + if index_metadata.index_config.search_settings != search_settings + || index_metadata.index_config.retention_policy_opt != retention_policy_opt + { + index_metadata.index_config.search_settings = search_settings; + index_metadata.index_config.retention_policy_opt = retention_policy_opt; + Ok(MutationOccurred::Yes(())) + } else { + Ok(MutationOccurred::No(())) + } + }) .await })?; IndexMetadataResponse::try_from_index_metadata(&updated_metadata) @@ -969,14 +964,10 @@ impl MetastoreService for PostgresqlMetastore { let source_config = request.deserialize_source_config()?; let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata::( - tx, - index_uid, - |mut index_metadata: IndexMetadata| { - index_metadata.add_source(source_config)?; - Ok(MutationOccurred::Yes(index_metadata)) - }, - ) + mutate_index_metadata::(tx, index_uid, |index_metadata| { + index_metadata.add_source(source_config)?; + Ok(MutationOccurred::Yes(())) + }) .await?; Ok(()) })?; @@ -990,11 +981,11 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata(tx, index_uid, |mut index_metadata| { + mutate_index_metadata(tx, index_uid, |index_metadata| { if index_metadata.toggle_source(&request.source_id, request.enable)? { - Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata)) + Ok::<_, MetastoreError>(MutationOccurred::Yes(())) } else { - Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata)) + Ok::<_, MetastoreError>(MutationOccurred::No(())) } }) .await?; @@ -1011,9 +1002,9 @@ impl MetastoreService for PostgresqlMetastore { let index_uid: IndexUid = request.index_uid().clone(); let source_id = request.source_id.clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata(tx, index_uid.clone(), |mut index_metadata| { + mutate_index_metadata(tx, index_uid.clone(), |index_metadata| { index_metadata.delete_source(&source_id)?; - Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata)) + Ok::<_, MetastoreError>(MutationOccurred::Yes(())) }) .await?; sqlx::query( @@ -1040,11 +1031,11 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata(tx, index_uid, |mut index_metadata| { + mutate_index_metadata(tx, index_uid, |index_metadata| { if index_metadata.checkpoint.reset_source(&request.source_id) { - Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata)) + Ok::<_, MetastoreError>(MutationOccurred::Yes(())) } else { - Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata)) + Ok::<_, MetastoreError>(MutationOccurred::No(())) } }) .await?;