@@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
2121use serde_json:: { Map , Value } ;
2222use std:: str;
2323
24+ use crate :: utils:: json:: flatten:: { generic_flattening, has_more_than_max_allowed_levels} ;
25+
2426#[ derive( Serialize , Deserialize , Debug ) ]
2527#[ serde( rename_all = "camelCase" ) ]
2628pub struct Message {
@@ -57,29 +59,56 @@ struct Data {
5759// "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a",
5860// "timestamp": "1704964113659"
5961// }
60- pub fn flatten_kinesis_logs ( message : Message ) -> Vec < Value > {
62+ pub async fn flatten_kinesis_logs ( message : Message ) -> Result < Vec < Value > , anyhow :: Error > {
6163 let mut vec_kinesis_json = Vec :: new ( ) ;
6264
6365 for record in message. records . iter ( ) {
64- let bytes = STANDARD . decode ( record. data . clone ( ) ) . unwrap ( ) ;
65- let json_string: String = String :: from_utf8 ( bytes) . unwrap ( ) ;
66- let json: serde_json:: Value = serde_json:: from_str ( & json_string) . unwrap ( ) ;
67- let mut kinesis_json: Map < String , Value > = match serde_json:: from_value ( json) {
68- Ok ( value) => value,
69- Err ( error) => panic ! ( "Failed to deserialize JSON: {}" , error) ,
70- } ;
71-
72- kinesis_json. insert (
73- "requestId" . to_owned ( ) ,
74- Value :: String ( message. request_id . clone ( ) ) ,
75- ) ;
76- kinesis_json. insert (
77- "timestamp" . to_owned ( ) ,
78- Value :: String ( message. timestamp . to_string ( ) ) ,
79- ) ;
66+ let bytes = STANDARD . decode ( record. data . clone ( ) ) ?;
67+ if let Ok ( json_string) = String :: from_utf8 ( bytes) {
68+ let json: serde_json:: Value = serde_json:: from_str ( & json_string) ?;
69+ // Check if the JSON has more than the allowed levels of nesting
70+ // If it has less than or equal to the allowed levels, we flatten it.
71+ // If it has more than the allowed levels, we just push it as is
72+ // without flattening or modifying it.
73+ if !has_more_than_max_allowed_levels ( & json, 1 ) {
74+ let flattened_json_arr = generic_flattening ( & json) ?;
75+ for flattened_json in flattened_json_arr {
76+ let mut kinesis_json: Map < String , Value > =
77+ serde_json:: from_value ( flattened_json) ?;
78+ kinesis_json. insert (
79+ "requestId" . to_owned ( ) ,
80+ Value :: String ( message. request_id . clone ( ) ) ,
81+ ) ;
82+ kinesis_json. insert (
83+ "timestamp" . to_owned ( ) ,
84+ Value :: String ( message. timestamp . to_string ( ) ) ,
85+ ) ;
8086
81- vec_kinesis_json. push ( Value :: Object ( kinesis_json) ) ;
87+ vec_kinesis_json. push ( Value :: Object ( kinesis_json) ) ;
88+ }
89+ } else {
90+ // If the JSON has more than the allowed levels, we just push it as is
91+ // without flattening or modifying it.
92+ // This is a fallback to ensure we don't lose data.
93+ tracing:: warn!(
94+ "Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record." ,
95+ message. request_id, message. timestamp
96+ ) ;
97+ vec_kinesis_json. push ( json) ;
98+ }
99+ } else {
100+ tracing:: error!(
101+ "Failed to decode base64 data for kinesis log with requestId {} and timestamp {}" ,
102+ message. request_id,
103+ message. timestamp
104+ ) ;
105+ return Err ( anyhow:: anyhow!(
106+ "Failed to decode base64 data for record with requestId {} and timestamp {}" ,
107+ message. request_id,
108+ message. timestamp
109+ ) ) ;
110+ }
82111 }
83112
84- vec_kinesis_json
113+ Ok ( vec_kinesis_json)
85114}
0 commit comments