diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 8f5971a13..35724bbd7 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, Error as AnyError}; use arrow_array::{RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use chrono::DateTime; +use serde_json::Value; use crate::utils::{self, arrow::get_field}; @@ -171,3 +173,30 @@ pub fn update_field_type_in_schema( .collect(); Arc::new(Schema::new(new_schema)) } + +pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema { + let new_schema: Vec = schema + .fields() + .iter() + .map(|field| { + if field.data_type() == &DataType::Utf8 { + if let Value::Object(map) = &value { + if let Some(Value::String(s)) = map.get(field.name()) { + if DateTime::parse_from_rfc3339(s).is_ok() { + // Update the field's data type to Timestamp + return Field::new( + field.name().clone(), + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ); + } + } + } + } + // Return the original field if no update is needed + Field::new(field.name(), field.data_type().clone(), true) + }) + .collect(); + + Schema::new(new_schema) +} diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 0597738a5..fd2280470 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -22,6 +22,7 @@ use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::create_update_stream; use crate::alerts::Alerts; +use crate::event::format::update_data_type_to_datetime; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; @@ -36,6 +37,7 @@ use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; +use arrow_json::reader::infer_json_schema_from_iterator; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; @@ -89,6 +91,26 @@ pub async fn list(_: HttpRequest) -> impl Responder { web::Json(res) } +pub async fn detect_schema(body: Bytes) -> Result { + let body_val: Value = serde_json::from_slice(&body)?; + let value_arr: Vec = match body_val { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => { + return Err(StreamError::Custom { + msg: "please send json events as part of the request".to_string(), + status: StatusCode::BAD_REQUEST, + }) + } + }; + + let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap(); + for value in value_arr { + schema = update_data_type_to_datetime(schema, value); + } + Ok((web::Json(schema), StatusCode::OK)) +} + pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let schema = STREAM_INFO.schema(&stream_name)?; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 015e01afc..34f6d46a8 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -258,6 +258,17 @@ impl QueryServer { web::resource("") .route(web::get().to(logstream::list).authorize(Action::ListStream)), ) + .service( + web::scope("/schema/detect").service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::post() + .to(logstream::detect_schema) + .authorize(Action::DetectSchema), + ), + ), + ) .service( web::scope("/{logstream}") .service( diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index d1e4b9aad..ef8467f67 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -292,6 +292,17 @@ impl Server { web::resource("") .route(web::get().to(logstream::list).authorize(Action::ListStream)), ) + .service( + web::scope("/schema/detect").service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::post() + .to(logstream::detect_schema) + .authorize(Action::DetectSchema), + ), + ), + ) .service( web::scope("/{logstream}") .service( diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 522c5a895..0e8f1ab24 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -25,6 +25,7 @@ pub enum Action { CreateStream, ListStream, GetStreamInfo, + DetectSchema, GetSchema, GetStats, DeleteStream, @@ -140,6 +141,7 @@ impl RoleBuilder { | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema + | Action::DetectSchema | Action::GetStats | Action::GetRetention | Action::PutRetention @@ -214,6 +216,7 @@ pub mod model { Action::DeleteStream, Action::ListStream, Action::GetStreamInfo, + Action::DetectSchema, Action::GetSchema, Action::GetStats, Action::GetRetention,