From f4ae7d83c08da3d349126cf80fda3a53bf6cd022 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 26 Mar 2024 23:51:36 +0530 Subject: [PATCH 1/3] feat: allow historical ingestion only when date column provided in header x-p-time-partition and server time are within the same minute, no change for default ingestions --- server/src/event/format.rs | 4 +- server/src/event/format/json.rs | 3 +- server/src/handlers.rs | 1 + server/src/handlers/http/ingest.rs | 70 +++++++++++---- server/src/handlers/http/logstream.rs | 35 ++++++-- server/src/metadata.rs | 14 ++- server/src/storage/object_storage.rs | 16 +++- server/src/storage/staging.rs | 57 +++++++++---- server/src/utils/json.rs | 7 +- server/src/utils/json/flatten.rs | 117 +++++++++++++++++++++----- 10 files changed, 252 insertions(+), 72 deletions(-) diff --git a/server/src/event/format.rs b/server/src/event/format.rs index cd11e440b..c4f67912f 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -41,13 +41,15 @@ pub trait EventFormat: Sized { fn to_data( self, schema: HashMap>, + time_partition: Option, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; fn into_recordbatch( self, schema: HashMap>, + time_partition: Option, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?; + let (data, mut schema, is_first, tags, metadata) = self.to_data(schema, time_partition)?; if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index edeb395b3..699065a45 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -45,8 +45,9 @@ impl EventFormat for Event { fn to_data( self, schema: HashMap>, + time_partition: Option, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data)?; + let data = flatten_json_body(self.data, time_partition)?; let stream_schema = schema; // incoming event may be a single json or a json array diff --git a/server/src/handlers.rs b/server/src/handlers.rs index a8ae29e9f..456f5fc68 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -23,6 +23,7 @@ const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; +const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 231e8eefb..ef63b4bfa 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -99,7 +99,14 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .ok_or(PostError::StreamNotFound(stream_name.clone()))? .schema .clone(); - into_event_batch(req, body, schema)? + + let time_partition = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .time_partition + .clone(); + + into_event_batch(req, body, schema, time_partition)? }; event::Event { @@ -119,6 +126,7 @@ fn into_event_batch( req: HttpRequest, body: Bytes, schema: HashMap>, + time_partition: Option, ) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; @@ -129,7 +137,7 @@ fn into_event_batch( tags, metadata, }; - let (rb, is_first) = event.into_recordbatch(schema)?; + let (rb, is_first) = event.into_recordbatch(schema, time_partition)?; Ok((size, rb, is_first)) } @@ -138,7 +146,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string()).await?; + super::logstream::create_stream(stream_name.to_string(), "").await?; Ok(()) } @@ -241,6 +249,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, ) .unwrap(); @@ -287,6 +296,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, ) .unwrap(); @@ -320,8 +330,13 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (_, rb, _) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -353,10 +368,13 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) - .is_err() - ); + assert!(into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None + ) + .is_err()); } #[test] @@ -374,8 +392,13 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (_, rb, _) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -391,6 +414,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None ) .is_err()) } @@ -419,6 +443,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, ) .unwrap(); @@ -472,6 +497,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, ) .unwrap(); @@ -521,8 +547,13 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (_, rb, _) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -566,6 +597,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, ) .unwrap(); @@ -612,10 +644,13 @@ mod tests { .into_iter(), ); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) - .is_err() - ); + assert!(into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None + ) + .is_err()); } #[test] @@ -647,6 +682,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, ) .unwrap(); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 3da02c1fc..e2bca0193 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -18,18 +18,18 @@ use std::fs; -use actix_web::http::StatusCode; -use actix_web::{web, HttpRequest, Responder}; -use chrono::Utc; -use serde_json::Value; - use self::error::{CreateStreamError, StreamError}; use crate::alerts::Alerts; +use crate::handlers::TIME_PARTITION_KEY; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage::{retention::Retention, LogStream, StorageDir}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; +use actix_web::http::StatusCode; +use actix_web::{web, HttpRequest, Responder}; +use chrono::Utc; +use serde_json::Value; pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -108,6 +108,16 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest) -> Result { + let time_partition = if let Some((_, time_partition_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == TIME_PARTITION_KEY) + { + time_partition_name.to_str().unwrap() + } else { + "" + }; + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if metadata::STREAM_INFO.stream_exists(&stream_name) { @@ -119,7 +129,7 @@ pub async fn put_stream(req: HttpRequest) -> Result status: StatusCode::BAD_REQUEST, }); } else { - create_stream(stream_name).await?; + create_stream(stream_name, time_partition).await?; } Ok(("log stream created", StatusCode::OK)) @@ -326,13 +336,16 @@ fn remove_id_from_alerts(value: &mut Value) { } } -pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> { +pub async fn create_stream( + stream_name: String, + time_partition: &str, +) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage.create_stream(&stream_name).await { + if let Err(err) = storage.create_stream(&stream_name, time_partition).await { return Err(CreateStreamError::Storage { stream_name, err }); } @@ -344,7 +357,11 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> let stream_meta = stream_meta.unwrap(); let created_at = stream_meta.created_at; - metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + ); Ok(()) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 1d2ac39df..b20478dbc 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -92,6 +92,13 @@ impl StreamInfo { .map(|metadata| metadata.cache_enabled) } + pub fn get_time_partition(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.time_partition.clone()) + } + pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); let stream = map @@ -143,7 +150,7 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String, created_at: String) { + pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { created_at: if created_at.is_empty() { @@ -151,6 +158,11 @@ impl StreamInfo { } else { created_at }, + time_partition: if time_partition.is_empty() { + None + } else { + Some(time_partition) + }, ..Default::default() }; map.insert(stream_name, metadata); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7797bb8eb..c22ef9236 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -106,11 +106,20 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } - async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + async fn create_stream( + &self, + stream_name: &str, + time_partition: &str, + ) -> Result<(), ObjectStorageError> { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; + if time_partition.is_empty() { + format.time_partition = None; + } else { + format.time_partition = Some(time_partition.to_string()); + } let format_json = to_bytes(&format); self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty())) .await?; @@ -325,8 +334,11 @@ pub trait ObjectStorage: Sync + 'static { let cache_enabled = STREAM_INFO .cache_enabled(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let time_partition = STREAM_INFO + .get_time_partition(stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let dir = StorageDir::new(stream); - let schema = convert_disk_files_to_parquet(stream, &dir) + let schema = convert_disk_files_to_parquet(stream, &dir, time_partition) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; if let Some(schema) = schema { diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index f965765fd..65c016ef8 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -176,6 +176,7 @@ pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { pub fn convert_disk_files_to_parquet( stream: &str, dir: &StorageDir, + time_partition: Option, ) -> Result, MoveDataError> { let mut schemas = Vec::new(); @@ -200,10 +201,14 @@ pub fn convert_disk_files_to_parquet( } let record_reader = MergedReverseRecordReader::try_new(&files).unwrap(); - - let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = parquet_writer_props().build(); let merged_schema = record_reader.merged_schema(); + let mut index_time_partition: usize = 0; + if let Some(time_partition) = time_partition.as_ref() { + index_time_partition = merged_schema.index_of(time_partition).unwrap(); + } + let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + let props = parquet_writer_props(time_partition.clone(), index_time_partition).build(); + schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; @@ -229,19 +234,39 @@ pub fn convert_disk_files_to_parquet( } } -fn parquet_writer_props() -> WriterPropertiesBuilder { - WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]), - Encoding::DELTA_BINARY_PACKED, - ) - .set_sorting_columns(Some(vec![SortingColumn { - column_idx: 0, - descending: true, - nulls_first: true, - }])) +fn parquet_writer_props( + time_partition: Option, + index_time_partition: usize, +) -> WriterPropertiesBuilder { + let index_time_partition: i32 = index_time_partition as i32; + + if let Some(time_partition) = time_partition { + WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec![time_partition]), + Encoding::DELTA_BYTE_ARRAY, + ) + .set_sorting_columns(Some(vec![SortingColumn { + column_idx: index_time_partition, + descending: true, + nulls_first: true, + }])) + } else { + WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]), + Encoding::DELTA_BINARY_PACKED, + ) + .set_sorting_columns(Some(vec![SortingColumn { + column_idx: index_time_partition, + descending: true, + nulls_first: true, + }])) + } } #[derive(Debug, thiserror::Error)] diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index 0f18d4bf7..082b4e823 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -21,8 +21,11 @@ use serde_json::Value; pub mod flatten; -pub fn flatten_json_body(body: serde_json::Value) -> Result { - flatten::flatten(body, "_") +pub fn flatten_json_body( + body: serde_json::Value, + time_partition: Option, +) -> Result { + flatten::flatten(body, "_", time_partition) } pub fn convert_to_string(value: &Value) -> Value { diff --git a/server/src/utils/json/flatten.rs b/server/src/utils/json/flatten.rs index 4b7a21556..bc7a33a21 100644 --- a/server/src/utils/json/flatten.rs +++ b/server/src/utils/json/flatten.rs @@ -17,26 +17,45 @@ */ use anyhow::anyhow; +use chrono::{DateTime, Timelike, Utc}; use itertools::Itertools; use serde_json::map::Map; use serde_json::value::Value; -pub fn flatten(nested_value: Value, separator: &str) -> Result { +pub fn flatten( + nested_value: Value, + separator: &str, + time_partition: Option, +) -> Result { match nested_value { Value::Object(nested_dict) => { - let mut map = Map::new(); - flatten_object(&mut map, None, nested_dict, separator)?; - Ok(Value::Object(map)) + let validate_time_partition_result = + validate_time_partition(Value::Object(nested_dict.clone()), time_partition.clone()); + if validate_time_partition_result.is_ok() { + let mut map = Map::new(); + flatten_object(&mut map, None, nested_dict, separator)?; + Ok(Value::Object(map)) + } else { + Err(anyhow!(validate_time_partition_result.unwrap_err())) + } } Value::Array(mut arr) => { for _value in &mut arr { - let value = std::mem::replace(_value, Value::Null); - let mut map = Map::new(); - let Value::Object(obj) = value else { - return Err(anyhow!("Expected object in array of objects")); - }; - flatten_object(&mut map, None, obj, separator)?; - *_value = Value::Object(map); + let value: Value = _value.clone(); + let validate_time_partition_result = + validate_time_partition(value, time_partition.clone()); + + if validate_time_partition_result.is_ok() { + let value = std::mem::replace(_value, Value::Null); + let mut map = Map::new(); + let Value::Object(obj) = value else { + return Err(anyhow!("Expected object in array of objects")); + }; + flatten_object(&mut map, None, obj, separator)?; + *_value = Value::Object(map); + } else { + return Err(anyhow!(validate_time_partition_result.unwrap_err())); + } } Ok(Value::Array(arr)) } @@ -44,6 +63,58 @@ pub fn flatten(nested_value: Value, separator: &str) -> Result, +) -> Result { + if time_partition.is_none() { + Ok(true) + } else { + let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); + if body_timestamp.is_some() { + if body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .is_ok() + { + let parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + + if parsed_timestamp.date() == Utc::now().naive_utc().date() + && parsed_timestamp.hour() == Utc::now().naive_utc().hour() + && parsed_timestamp.minute() == Utc::now().naive_utc().minute() + { + Ok(true) + } else { + Err(anyhow!(format!( + "field {} and server time are not same", + time_partition.unwrap() + ))) + } + } else { + Err(anyhow!(format!( + "field {} is not in the correct datetime format", + time_partition.unwrap() + ))) + } + } else { + Err(anyhow!(format!( + "ingestion failed as field {} is not part of the log", + time_partition.unwrap() + ))) + } + } +} + pub fn flatten_with_parent_prefix( nested_value: Value, prefix: &str, @@ -158,19 +229,19 @@ mod tests { #[test] fn flatten_single_key_string() { let obj = json!({"key": "value"}); - assert_eq!(obj.clone(), flatten(obj, "_").unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None).unwrap()); } #[test] fn flatten_single_key_int() { let obj = json!({"key": 1}); - assert_eq!(obj.clone(), flatten(obj, "_").unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None).unwrap()); } #[test] fn flatten_multiple_key_value() { let obj = json!({"key1": 1, "key2": "value2"}); - assert_eq!(obj.clone(), flatten(obj, "_").unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None).unwrap()); } #[test] @@ -178,7 +249,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key":"value"}}); assert_eq!( json!({"key": "value", "nested_key.key": "value"}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } @@ -187,7 +258,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); assert_eq!( json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } @@ -196,7 +267,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); assert_eq!( json!({"key": "value", "nested_key.key1": [1,2,3]}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } @@ -205,7 +276,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"]}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } @@ -214,7 +285,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } @@ -223,7 +294,7 @@ mod tests { let obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } @@ -232,7 +303,7 @@ mod tests { let obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } @@ -241,14 +312,14 @@ mod tests { let obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}), - flatten(obj, ".").unwrap() + flatten(obj, ".", None).unwrap() ); } #[test] fn flatten_mixed_object() { let obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(obj, ".").is_err()); + assert!(flatten(obj, ".", None).is_err()); } #[test] From 9af09a7be723b332065b3eb6a18be0e8302bc3e0 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 27 Mar 2024 11:22:26 +0530 Subject: [PATCH 2/3] cargo clippy fix because of rust upgrade --- server/src/query/filter_optimizer.rs | 6 ++---- server/src/rbac/map.rs | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/server/src/query/filter_optimizer.rs b/server/src/query/filter_optimizer.rs index 7a826537a..9be90cfd2 100644 --- a/server/src/query/filter_optimizer.rs +++ b/server/src/query/filter_optimizer.rs @@ -116,10 +116,8 @@ impl FilterOptimizerRule { let mut patterns = self.literals.iter().map(|literal| { Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal))) }); - - let Some(mut filter_expr) = patterns.next() else { - return None; - }; + + let mut filter_expr = patterns.next()?; for expr in patterns { filter_expr = or(filter_expr, expr) } diff --git a/server/src/rbac/map.rs b/server/src/rbac/map.rs index 6a82b9f9d..d2c035a9b 100644 --- a/server/src/rbac/map.rs +++ b/server/src/rbac/map.rs @@ -159,9 +159,8 @@ impl Sessions { // remove a specific session pub fn remove_session(&mut self, key: &SessionKey) -> Option { - let Some((user, _)) = self.active_sessions.remove(key) else { - return None; - }; + + let (user, _) = self.active_sessions.remove(key)?; if let Some(items) = self.user_sessions.get_mut(&user) { items.retain(|(session, _)| session != key); From 601f798015bebfcfdb21405b7c4a6d27e2a47e95 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 27 Mar 2024 11:25:00 +0530 Subject: [PATCH 3/3] cargo fmt changes --- server/src/query/filter_optimizer.rs | 2 +- server/src/rbac/map.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/query/filter_optimizer.rs b/server/src/query/filter_optimizer.rs index 9be90cfd2..fc087a2e8 100644 --- a/server/src/query/filter_optimizer.rs +++ b/server/src/query/filter_optimizer.rs @@ -116,7 +116,7 @@ impl FilterOptimizerRule { let mut patterns = self.literals.iter().map(|literal| { Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal))) }); - + let mut filter_expr = patterns.next()?; for expr in patterns { filter_expr = or(filter_expr, expr) diff --git a/server/src/rbac/map.rs b/server/src/rbac/map.rs index d2c035a9b..b5b92e5a7 100644 --- a/server/src/rbac/map.rs +++ b/server/src/rbac/map.rs @@ -159,7 +159,6 @@ impl Sessions { // remove a specific session pub fn remove_session(&mut self, key: &SessionKey) -> Option { - let (user, _) = self.active_sessions.remove(key)?; if let Some(items) = self.user_sessions.get_mut(&user) {