Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 16 additions & 33 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*
*/

use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use async_trait::async_trait;
use chrono::Utc;
use futures_util::StreamExt;
use rdkafka::consumer::{CommitMode, Consumer};
use serde_json::Value;
Expand Down Expand Up @@ -58,37 +57,10 @@ impl ParseableSinkProcessor {
let stream = PARSEABLE.get_stream(stream_name)?;
let schema = stream.get_schema_raw();
let time_partition = stream.get_time_partition();
let custom_partition = stream.get_custom_partition();
let static_schema_flag = stream.get_static_schema_flag();
let schema_version = stream.get_schema_version();

let (json_vec, total_payload_size) = Self::json_vec(records);
let batch_json_event = json::Event {
data: Value::Array(json_vec),
};

let (rb, is_first) = batch_json_event.into_recordbatch(
&schema,
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;

let p_event = ParseableEvent {
rb,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: total_payload_size,
is_first_event: is_first,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::UserDefined,
};

Ok(p_event)
}

fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
let mut json_vec = Vec::with_capacity(records.len());
let mut total_payload_size = 0u64;

Expand All @@ -99,19 +71,30 @@ impl ParseableSinkProcessor {
}
}

(json_vec, total_payload_size)
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
stream_name.to_string(),
total_payload_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
time_partition.as_ref(),
schema_version,
StreamType::UserDefined,
)?;

Ok(p_event)
}
}

#[async_trait]
impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
async fn process(&self, records: Vec<ConsumerRecord>) -> anyhow::Result<()> {
let len = records.len();
debug!("Processing {} records", len);
debug!("Processing {len} records");

self.build_event_from_chunk(&records).await?.process()?;

debug!("Processed {} records", len);
debug!("Processed {len} records");
Ok(())
}
}
Expand Down
128 changes: 125 additions & 3 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,28 @@ use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
use arrow_schema::{DataType, Field, Fields, Schema};
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
use itertools::Itertools;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::EventFormat;
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};

pub struct Event {
pub data: Value,
pub json: Value,
ingestion_time: DateTime<Utc>,
}

impl Event {
pub fn new(json: Value) -> Self {
Self {
json,
ingestion_time: Utc::now(),
}
}
}

impl EventFormat for Event {
Expand All @@ -52,7 +63,7 @@ impl EventFormat for Event {
// incoming event may be a single json or a json array
// but Data (type defined above) is a vector of json values
// hence we need to convert the incoming event to a vector of json values
let value_arr = match self.data {
let value_arr = match self.json {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => unreachable!("flatten would have failed beforehand"),
Expand Down Expand Up @@ -120,6 +131,83 @@ impl EventFormat for Event {
Ok(None) => unreachable!("all records are added to one rb"),
}
}

/// Converts a JSON event into a Parseable Event
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
get_custom_partition_values(&self.json, &custom_partitions)
}
None => HashMap::new(),
};

let parsed_timestamp = match time_partition {
Some(time_partition) => get_parsed_timestamp(&self.json, time_partition)?,
_ => self.ingestion_time.naive_utc(),
};

let (rb, is_first_event) = self.into_recordbatch(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
)?;

Ok(super::Event {
rb,
stream_name,
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: None,
custom_partition_values,
stream_type,
})
}
}

pub fn get_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
custom_partition_values
}

fn get_parsed_timestamp(
json: &Value,
time_partition: &str,
) -> Result<NaiveDateTime, anyhow::Error> {
let current_time = json
.get(time_partition)
.ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?;
let parsed_time: DateTime<Utc> = serde_json::from_value(current_time.clone())?;

Ok(parsed_time.naive_utc())
}

// Returns arrow schema with the fields that are present in the request body
Expand Down Expand Up @@ -225,3 +313,37 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
}
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use serde_json::json;

use super::*;

#[test]
fn parse_time_parition_from_value() {
let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
let parsed = get_parsed_timestamp(&json, "timestamp");

let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap();
assert_eq!(parsed.unwrap(), expected);
}

#[test]
fn time_parition_not_in_json() {
let json = json!({"hello": "world!"});
let parsed = get_parsed_timestamp(&json, "timestamp");

assert!(parsed.is_err());
}

#[test]
fn time_parition_not_parseable_as_datetime() {
let json = json!({"timestamp": "not time"});
let parsed = get_parsed_timestamp(&json, "timestamp");

assert!(parsed.is_err());
}
}
16 changes: 15 additions & 1 deletion src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ use serde_json::Value;

use crate::{
metadata::SchemaVersion,
storage::StreamType,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
};

use super::DEFAULT_TIMESTAMP_KEY;
use super::{Event, DEFAULT_TIMESTAMP_KEY};

pub mod json;

Expand Down Expand Up @@ -172,6 +173,19 @@ pub trait EventFormat: Sized {
}
true
}

#[allow(clippy::too_many_arguments)]
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<Event, AnyError>;
}

pub fn get_existing_field_names(
Expand Down
Loading
Loading