@@ -30,6 +30,9 @@ use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3030use crate :: storage:: retention:: Retention ;
3131use crate :: storage:: { StreamInfo , StreamType } ;
3232use crate :: utils:: actix:: extract_session_key_from_req;
33+ use crate :: utils:: json:: flatten:: {
34+ self , convert_to_array, generic_flattening, has_more_than_max_allowed_levels,
35+ } ;
3336use crate :: { stats, validator, LOCK_EXPECT } ;
3437
3538use actix_web:: http:: StatusCode ;
@@ -102,22 +105,42 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
102105}
103106
104107pub async fn detect_schema ( Json ( json) : Json < Value > ) -> Result < impl Responder , StreamError > {
105- let log_records: Vec < Value > = match json {
106- Value :: Array ( arr) => arr,
107- value @ Value :: Object ( _) => vec ! [ value] ,
108- _ => {
108+ // flatten before infer
109+ if !has_more_than_max_allowed_levels ( & json, 1 ) {
110+ //perform generic flattening, return error if failed to flatten
111+ let mut flattened_json = match generic_flattening ( & json) {
112+ Ok ( flattened) => convert_to_array ( flattened) . unwrap ( ) ,
113+ Err ( e) => {
114+ return Err ( StreamError :: Custom {
115+ msg : e. to_string ( ) ,
116+ status : StatusCode :: BAD_REQUEST ,
117+ } )
118+ }
119+ } ;
120+ if let Err ( err) = flatten:: flatten ( & mut flattened_json, "_" , None , None , None , false ) {
109121 return Err ( StreamError :: Custom {
110- msg : "please send json events as part of the request" . to_string ( ) ,
122+ msg : err . to_string ( ) ,
111123 status : StatusCode :: BAD_REQUEST ,
112- } )
124+ } ) ;
113125 }
114- } ;
115-
116- let mut schema = Arc :: new ( infer_json_schema_from_iterator ( log_records. iter ( ) . map ( Ok ) ) . unwrap ( ) ) ;
117- for log_record in log_records {
118- schema = override_data_type ( schema, log_record, SchemaVersion :: V1 ) ;
126+ let flattened_json_arr = match flattened_json {
127+ Value :: Array ( arr) => arr,
128+ value @ Value :: Object ( _) => vec ! [ value] ,
129+ _ => unreachable ! ( "flatten would have failed beforehand" ) ,
130+ } ;
131+ let mut schema =
132+ Arc :: new ( infer_json_schema_from_iterator ( flattened_json_arr. iter ( ) . map ( Ok ) ) . unwrap ( ) ) ;
133+ for flattened_json in flattened_json_arr {
134+ schema = override_data_type ( schema, flattened_json, SchemaVersion :: V1 ) ;
135+ }
136+ Ok ( ( web:: Json ( schema) , StatusCode :: OK ) )
137+ } else {
138+ // error out if the JSON is heavily nested
139+ Err ( StreamError :: Custom {
140+ msg : "heavily nested, cannot flatten this JSON" . to_string ( ) ,
141+ status : StatusCode :: BAD_REQUEST ,
142+ } )
119143 }
120- Ok ( ( web:: Json ( schema) , StatusCode :: OK ) )
121144}
122145
123146pub async fn get_schema ( stream_name : Path < String > ) -> Result < impl Responder , StreamError > {
0 commit comments