Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 86 additions & 17 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*
*/
use actix_web::rt::spawn;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::error::ArrowError;
Expand All @@ -24,17 +25,19 @@ use datafusion::arrow::json;
use datafusion::arrow::json::reader::infer_json_schema;
use datafusion::arrow::record_batch::RecordBatch;
use lazy_static::lazy_static;
use log::error;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::BufReader;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::sync::RwLock;

use crate::metadata;
use crate::metadata::LOCK_EXPECT;
use crate::option::CONFIG;
use crate::s3;
use crate::storage::ObjectStorage;

use self::error::EventError;
Expand Down Expand Up @@ -190,7 +193,7 @@ pub struct Event {
// Events holds the schema related to a each event for a single log stream

impl Event {
pub async fn process(&self, storage: &impl ObjectStorage) -> Result<(), EventError> {
pub async fn process(&self) -> Result<(), EventError> {
let inferred_schema = self.infer_schema()?;

let event = self.get_reader(inferred_schema.clone());
Expand All @@ -207,12 +210,11 @@ impl Event {
} else {
// if stream schema is none then it is first event,
// process first event and store schema in obect store
self.process_first_event(event, inferred_schema, storage)
.await?
self.process_first_event::<s3::S3, _>(event, inferred_schema)?
};

if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await {
error!("Error checking for alerts. {:?}", e);
log::error!("Error checking for alerts. {:?}", e);
}

Ok(())
Expand All @@ -221,25 +223,67 @@ impl Event {
// This is called when the first event of a log stream is received. The first event is
// special because we parse this event to generate the schema for the log stream. This
// schema is then enforced on rest of the events sent to this log stream.
async fn process_first_event<R: std::io::Read>(
fn process_first_event<S: ObjectStorage, R: std::io::Read>(
&self,
mut event: json::Reader<R>,
schema: Schema,
storage: &impl ObjectStorage,
) -> Result<u64, EventError> {
let rb = event.next()?.ok_or(EventError::MissingRecord)?;
let stream_name = &self.stream_name;

// Store record batch on local cache
STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap();
// note for functions _schema_with_map and _set_schema_with_map,
// these are to be called while holding a write lock specifically.
// this guarantees two things
// - no other metadata operation can happen inbetween
// - map always have an entry for this stream

// Put the inferred schema to object store
storage.put_schema(stream_name.clone(), &schema).await?;
let stream_name = &self.stream_name;

// set the schema in memory for this stream
metadata::STREAM_INFO.set_schema(stream_name, schema)?;
let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT);
// if the metadata is not none after acquiring lock
// then some other thread has already completed this function.
if _schema_with_map(stream_name, &stream_metadata).is_some() {
// drop the lock
drop(stream_metadata);
// Try to post event usual way
log::info!("first event is redirected to process_event");
self.process_event(event)
} else {
// stream metadata is still none,
// this means this execution should be considered as first event.

// Store record batch on local cache
log::info!("creating local writer for this first event");
let rb = event.next()?.ok_or(EventError::MissingRecord)?;
STREAM_WRITERS::append_to_local(stream_name, &rb)?;

log::info!("schema is set in memory map for logstream {}", stream_name);
_set_schema_with_map(stream_name, schema.clone(), &mut stream_metadata);
// drop mutex before going across await point
drop(stream_metadata);

log::info!(
"setting schema on objectstore for logstream {}",
stream_name
);
let storage = S::new();

let stream_name = stream_name.clone();
spawn(async move {
if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await {
// If this call has failed then currently there is no right way to make local state consistent
// this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync.
// Reasoning -
// - After dropping lock many events may process through
// - Processed events may sync before metadata deletion
log::error!(
"Parseable failed to upload schema to objectstore due to error {}",
e
);
log::error!("Please manually delete this logstream and create a new one.");
metadata::STREAM_INFO.delete_stream(&stream_name);
}
});

Ok(0)
Ok(0)
}
}

// event process all events after the 1st event. Concatenates record batches
Expand Down Expand Up @@ -273,6 +317,31 @@ impl Event {
}
}

// Special functions which reads from metadata map while holding the lock
#[inline]
pub fn _schema_with_map(
stream_name: &str,
map: &impl Deref<Target = HashMap<String, metadata::LogStreamMetadata>>,
) -> Option<Schema> {
map.get(stream_name)
.expect("map has entry for this stream name")
.schema
.to_owned()
}

#[inline]
// Special functions which writes to metadata map while holding the lock
pub fn _set_schema_with_map(
stream_name: &str,
schema: Schema,
map: &mut impl DerefMut<Target = HashMap<String, metadata::LogStreamMetadata>>,
) {
map.get_mut(stream_name)
.expect("map has entry for this stream name")
.schema
.replace(schema);
}

pub mod error {
use crate::metadata::error::stream_info::MetadataError;
use crate::storage::ObjectStorageError;
Expand Down
8 changes: 3 additions & 5 deletions server/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,18 @@ pub async fn post_event(
collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?,
)]);

let s3 = S3::new();

if let Some(array) = body.as_array() {
for body in array {
let body = merge(body.clone(), metadata.clone());
let body = merge(body, tags.clone());
let body = flatten_json_body(web::Json(body)).unwrap();

let e = event::Event {
let event = event::Event {
body,
stream_name: stream_name.clone(),
};

e.process(&s3).await?;
event.process().await?;
}
} else {
let body = merge(body.clone(), metadata);
Expand All @@ -88,7 +86,7 @@ pub async fn post_event(
stream_name,
};

event.process(&s3).await?;
event.process().await?;
}

Ok(HttpResponse::Ok().finish())
Expand Down
2 changes: 0 additions & 2 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
if s3.get_schema(&stream_name).await.is_err() {
// Fail if unable to create log stream on object store backend
if let Err(e) = s3.create_stream(&stream_name).await {
// delete the stream from metadata because we couldn't create it on object store backend
metadata::STREAM_INFO.delete_stream(&stream_name);
return response::ServerResponse {
msg: format!(
"failed to create log stream {} due to err: {}",
Expand Down
1 change: 1 addition & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl STREAM_INFO {
Ok(())
}

#[allow(dead_code)]
pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
Expand Down
4 changes: 4 additions & 0 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ impl S3 {

#[async_trait]
impl ObjectStorage for S3 {
fn new() -> Self {
Self::new()
}

async fn check(&self) -> Result<(), ObjectStorageError> {
self.client
.head_bucket()
Expand Down
1 change: 1 addition & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60

#[async_trait]
pub trait ObjectStorage: Sync + 'static {
fn new() -> Self;
async fn check(&self) -> Result<(), ObjectStorageError>;
async fn put_schema(
&self,
Expand Down