Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition
pub struct ParseableSinkProcessor;

impl ParseableSinkProcessor {
async fn build_event_from_chunk(
&self,
records: &[ConsumerRecord],
) -> anyhow::Result<ParseableEvent> {
async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<()> {
let stream_name = records
.first()
.map(|r| r.topic.as_str())
Expand Down Expand Up @@ -73,7 +70,7 @@ impl ParseableSinkProcessor {
schema_version,
)?;

let p_event = ParseableEvent {
ParseableEvent {
rb,
stream_name: stream_name.to_string(),
origin_format: "json",
Expand All @@ -83,9 +80,10 @@ impl ParseableSinkProcessor {
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::UserDefined,
};
}
.process(&stream)?;

Ok(p_event)
Ok(())
}

fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
Expand All @@ -109,7 +107,7 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
let len = records.len();
debug!("Processing {} records", len);

self.build_event_from_chunk(&records).await?.process()?;
self.process_event_from_chunk(&records).await?;

debug!("Processed {} records", len);
Ok(())
Expand Down
41 changes: 12 additions & 29 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
*
*/

pub mod format;
use std::{collections::HashMap, sync::Arc};

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use arrow_schema::Field;
use chrono::NaiveDateTime;
use error::EventError;
use itertools::Itertools;
use std::sync::Arc;

use self::error::EventError;
use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType, LOCK_EXPECT};
use chrono::NaiveDateTime;
use std::collections::HashMap;
use crate::{metadata::update_stats, parseable::Stream, storage::StreamType};

pub mod format;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";

Expand All @@ -46,7 +46,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub fn process(self) -> Result<(), EventError> {
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand All @@ -60,10 +60,10 @@ impl Event {
}

if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
stream.commit_schema(self.rb.schema())?;
}

PARSEABLE.get_or_create_stream(&self.stream_name).push(
stream.push(
&key,
&self.rb,
self.parsed_timestamp,
Expand All @@ -84,10 +84,10 @@ impl Event {
Ok(())
}

pub fn process_unchecked(&self) -> Result<(), EventError> {
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

PARSEABLE.get_or_create_stream(&self.stream_name).push(
stream.push(
&key,
&self.rb,
self.parsed_timestamp,
Expand All @@ -109,23 +109,6 @@ pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
format!("{hash:x}")
}

pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), EventError> {
let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned");

let map = &mut stream_metadata
.get_mut(stream_name)
.expect("map has entry for this stream name")
.metadata
.write()
.expect(LOCK_EXPECT)
.schema;
let current_schema = Schema::new(map.values().cloned().collect::<Fields>());
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
map.clear();
map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
Ok(())
}

pub mod error {
use arrow_schema::ArrowError;

Expand Down
Loading
Loading