Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, LogSource, Metadata, Tags};
use super::{EventFormat, LogSource};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
};

pub struct Event {
pub data: Value,
pub tags: Tags,
pub metadata: Metadata,
}

impl EventFormat for Event {
Expand All @@ -53,7 +51,7 @@ impl EventFormat for Event {
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
let data = flatten_json_body(
self.data,
None,
Expand Down Expand Up @@ -119,7 +117,7 @@ impl EventFormat for Event {
));
}

Ok((value_arr, schema, is_first, self.tags, self.metadata))
Ok((value_arr, schema, is_first))
}

// Convert the Data type (defined above) to arrow record batch
Expand Down
48 changes: 5 additions & 43 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,18 @@ use std::{
};

use anyhow::{anyhow, Error as AnyError};
use arrow_array::{RecordBatch, StringArray};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use serde_json::Value;

use crate::{
metadata::SchemaVersion,
utils::{self, arrow::get_field},
};
use crate::{metadata::SchemaVersion, utils::arrow::get_field};

use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY};
use super::DEFAULT_TIMESTAMP_KEY;

pub mod json;

static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];

type Tags = String;
type Metadata = String;
type EventSchema = Vec<Arc<Field>>;

/// Source of the logs, used to perform special processing for certain sources
Expand Down Expand Up @@ -87,7 +81,7 @@ pub trait EventFormat: Sized {
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
) -> Result<(Self::Data, EventSchema, bool), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;

Expand All @@ -99,26 +93,14 @@ pub trait EventFormat: Sized {
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
let (data, mut schema, is_first) = self.to_data(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
log_source,
)?;

// DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
};

if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
));
};

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
Expand All @@ -136,16 +118,6 @@ pub trait EventFormat: Sized {
)),
);

// p_tags and p_metadata are added to the end of the schema
let tags_index = schema.len();
let metadata_index = tags_index + 1;
schema.push(Arc::new(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)));
schema.push(Arc::new(Field::new(
DEFAULT_METADATA_KEY,
DataType::Utf8,
true,
)));

// prepare the record batch and new fields to be added
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
Expand All @@ -154,16 +126,6 @@ pub trait EventFormat: Sized {
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
let rb = Self::decode(data, new_schema.clone())?;
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
let metadata_arr =
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
// modify the record batch to add fields to respective indexes
let rb = utils::arrow::replace_columns(
Arc::clone(&new_schema),
&rb,
&[tags_index, metadata_index],
&[Arc::new(tags_arr), Arc::new(metadata_arr)],
);

Ok((rb, is_first))
}
Expand Down
2 changes: 0 additions & 2 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use chrono::NaiveDateTime;
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
pub const DEFAULT_METADATA_KEY: &str = "p_metadata";

#[derive(Clone)]
pub struct Event {
Expand Down
Loading
Loading