From 12cc99f7a4b2cad3a2e7c13303c3189f2eae122f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Oct 2024 23:54:52 +0530 Subject: [PATCH 1/2] feat: detect schema for a log event server can detect schema for a sample log event before creating stream helps user to create the static schema new API endpoint `POST /logstream/schema/detect` with body as a json object returns schema eg. for request json - ``` { "datetime": "2024-10-21T05:40:58.280Z", "b": 2.0, "c": 1, "host": "backend", "a": false, "id": "duzrixscdpavbdgc", "message": "Tom is interested in mathematics.", "method": "GET", "p_metadata": "state=fatal", "p_tags": "environment=development", "p_timestamp": "2024-10-15T14:00:00+05:30", "referrer": "http://www.facebook.com/", "status": 404, "user-identifier": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36 OPR/91.0.4516.20" } ``` API returns ``` { "fields": [ { "name": "datetime", "data_type": { "Timestamp": [ "Millisecond", null ] }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "b", "data_type": "Float64", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "c", "data_type": "Int64", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "host", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "a", "data_type": "Boolean", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "id", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "message", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "method", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "p_metadata", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "p_tags", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "p_timestamp", "data_type": { "Timestamp": [ "Millisecond", null ] }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "referrer", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "status", "data_type": "Int64", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "user-identifier", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ], "metadata": {} } console then asks user to confirm this schema or change as per his requirement once confirmed, he can create stream with the static schema ``` --- server/src/event/format.rs | 29 +++++++++++++++++++ server/src/handlers/http/logstream.rs | 21 ++++++++++++++ .../src/handlers/http/modal/query_server.rs | 11 +++++++ server/src/handlers/http/modal/server.rs | 11 +++++++ server/src/rbac/role.rs | 3 ++ 5 files changed, 75 insertions(+) diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 8f5971a13..35724bbd7 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, Error as AnyError}; use arrow_array::{RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use chrono::DateTime; +use serde_json::Value; use crate::utils::{self, arrow::get_field}; @@ -171,3 +173,30 @@ pub fn update_field_type_in_schema( .collect(); Arc::new(Schema::new(new_schema)) } + +pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema { + let new_schema: Vec = schema + .fields() + .iter() + .map(|field| { + if field.data_type() == &DataType::Utf8 { + if let Value::Object(map) = &value { + if let Some(Value::String(s)) = map.get(field.name()) { + if DateTime::parse_from_rfc3339(s).is_ok() { + // Update the field's data type to Timestamp + return Field::new( + field.name().clone(), + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ); + } + } + } + } + // Return the original field if no update is needed + Field::new(field.name(), field.data_type().clone(), true) + }) + .collect(); + + Schema::new(new_schema) +} diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 0597738a5..31e84d1ba 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -22,6 +22,7 @@ use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::create_update_stream; use crate::alerts::Alerts; +use crate::event::format::update_data_type_to_datetime; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; @@ -36,6 +37,7 @@ use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; +use arrow_json::reader::infer_json_schema_from_iterator; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; @@ -89,6 +91,25 @@ pub async fn list(_: HttpRequest) -> impl Responder { web::Json(res) } +pub async fn detect_schema(body: Bytes) -> Result { + let body_val: Value = serde_json::from_slice(&body)?; + let value_arr: Vec = match body_val { + value @ Value::Object(_) => vec![value], + _ => { + return Err(StreamError::Custom { + msg: "please send one json event as part of the request".to_string(), + status: StatusCode::BAD_REQUEST, + }) + } + }; + + let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap(); + for value in value_arr { + schema = update_data_type_to_datetime(schema, value); + } + Ok((web::Json(schema), StatusCode::OK)) +} + pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let schema = STREAM_INFO.schema(&stream_name)?; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 015e01afc..34f6d46a8 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -258,6 +258,17 @@ impl QueryServer { web::resource("") .route(web::get().to(logstream::list).authorize(Action::ListStream)), ) + .service( + web::scope("/schema/detect").service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::post() + .to(logstream::detect_schema) + .authorize(Action::DetectSchema), + ), + ), + ) .service( web::scope("/{logstream}") .service( diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index d1e4b9aad..ef8467f67 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -292,6 +292,17 @@ impl Server { web::resource("") .route(web::get().to(logstream::list).authorize(Action::ListStream)), ) + .service( + web::scope("/schema/detect").service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::post() + .to(logstream::detect_schema) + .authorize(Action::DetectSchema), + ), + ), + ) .service( web::scope("/{logstream}") .service( diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 522c5a895..0e8f1ab24 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -25,6 +25,7 @@ pub enum Action { CreateStream, ListStream, GetStreamInfo, + DetectSchema, GetSchema, GetStats, DeleteStream, @@ -140,6 +141,7 @@ impl RoleBuilder { | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema + | Action::DetectSchema | Action::GetStats | Action::GetRetention | Action::PutRetention @@ -214,6 +216,7 @@ pub mod model { Action::DeleteStream, Action::ListStream, Action::GetStreamInfo, + Action::DetectSchema, Action::GetSchema, Action::GetStats, Action::GetRetention, From 553e37bff079dfb3093fc745a78fd568ac8684f4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 22 Oct 2024 10:28:58 +0530 Subject: [PATCH 2/2] detect schema for json array --- server/src/handlers/http/logstream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 31e84d1ba..fd2280470 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -94,10 +94,11 @@ pub async fn list(_: HttpRequest) -> impl Responder { pub async fn detect_schema(body: Bytes) -> Result { let body_val: Value = serde_json::from_slice(&body)?; let value_arr: Vec = match body_val { + Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], _ => { return Err(StreamError::Custom { - msg: "please send one json event as part of the request".to_string(), + msg: "please send json events as part of the request".to_string(), status: StatusCode::BAD_REQUEST, }) }