@@ -47,30 +47,17 @@ use crate::{
4747const IGNORE_HEADERS : [ & str ; 3 ] = [ STREAM_NAME_HEADER_KEY , LOG_SOURCE_KEY , EXTRACT_LOG_KEY ] ;
4848const MAX_CUSTOM_FIELDS : usize = 10 ;
4949const MAX_FIELD_VALUE_LENGTH : usize = 100 ;
50+ // Maximum allowed count for fields in a dataset
51+ pub const DATASET_FIELDS_ALLOWED_LIMIT : usize = 250 ;
5052
5153pub async fn flatten_and_push_logs (
5254 json : Value ,
5355 stream_name : & str ,
5456 log_source : & LogSource ,
5557 p_custom_fields : & HashMap < String , String > ,
5658) -> Result < ( ) , PostError > {
57- // fetch the storage schema for the stream
58- let schema = PARSEABLE . get_stream ( stream_name) ?. get_schema ( ) ;
59- //fetch the fields count from the schema
60- let fields_count = schema. fields ( ) . len ( ) ;
61- if fields_count > PARSEABLE . options . dataset_fields_allowed_limit {
62- tracing:: error!(
63- "Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset." ,
64- stream_name,
65- fields_count,
66- PARSEABLE . options. dataset_fields_allowed_limit) ;
67- // Return an error if the fields count exceeds the limit
68- return Err ( PostError :: FieldsLimitExceeded (
69- stream_name. to_string ( ) ,
70- fields_count,
71- PARSEABLE . options . dataset_fields_allowed_limit ,
72- ) ) ;
73- }
59+ // Verify the dataset fields count
60+ verify_dataset_fields_count ( stream_name) ?;
7461
7562 match log_source {
7663 LogSource :: Kinesis => {
@@ -223,6 +210,39 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin
223210 p_custom_fields
224211}
225212
213+ fn verify_dataset_fields_count ( stream_name : & str ) -> Result < ( ) , PostError > {
214+ let fields_count = PARSEABLE
215+ . get_stream ( stream_name) ?
216+ . get_schema ( )
217+ . fields ( )
218+ . len ( ) ;
219+ let dataset_fields_warn_threshold = 0.8 * DATASET_FIELDS_ALLOWED_LIMIT as f64 ;
220+ // Check if the fields count exceeds the warn threshold
221+ if fields_count > dataset_fields_warn_threshold as usize {
222+ tracing:: warn!(
223+ "Fields count {0} for dataset {1} has exceeded the warning threshold of {2} fields, Parseable recommends creating a new dataset." ,
224+ dataset_fields_warn_threshold,
225+ stream_name,
226+ dataset_fields_warn_threshold) ;
227+ }
228+ // Check if the fields count exceeds the limit
229+ // Return an error if the fields count exceeds the limit
230+ if fields_count > PARSEABLE . options . dataset_fields_allowed_limit {
231+ tracing:: error!(
232+ "Ingestion has been stopped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset." ,
233+ stream_name,
234+ fields_count,
235+ PARSEABLE . options. dataset_fields_allowed_limit) ;
236+ // Return an error if the fields count exceeds the limit
237+ return Err ( PostError :: FieldsLimitExceeded (
238+ stream_name. to_string ( ) ,
239+ fields_count,
240+ PARSEABLE . options . dataset_fields_allowed_limit ,
241+ ) ) ;
242+ }
243+ Ok ( ( ) )
244+ }
245+
226246#[ cfg( test) ]
227247mod tests {
228248 use super :: * ;
0 commit comments