Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
</p>

<p align="center">
<a href="https://fossunited.org/" target="_blank"><img src="http://fossunited.org/files/fossunited-badge.svg"></a>
<img src="https://img.shields.io/github/commit-activity/m/parseablehq/parseable" alt="commits activity monthly">
<a href="https://launchpass.com/parseable" target="_blank"><img src="https://img.shields.io/badge/join%20slack-parseable-brightgreen.svg" alt="join slack"></a>
<a href="https://github.com/parseablehq/parseable/stargazers" target="_blank"><img src="https://img.shields.io/github/stars/parseablehq/parseable?style=social" alt="Github stars"></a>
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ pub async fn put(req: HttpRequest) -> HttpResponse {

// Proceed to create log stream if it doesn't exist
if s3.get_schema(&stream_name).await.is_err() {
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default());
// Fail if unable to create log stream on object store backend
if let Err(e) = s3.create_stream(&stream_name).await {
// delete the stream from metadata because we couldn't create it on object store backend
Expand All @@ -180,6 +179,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
}
.to_http();
}
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default());
return response::ServerResponse {
msg: format!("created log stream {}", stream_name),
code: StatusCode::OK,
Expand Down
3 changes: 2 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use filetime::FileTime;
use log::warn;
use rustls::{Certificate, PrivateKey, ServerConfig};
use rustls_pemfile::{certs, pkcs8_private_keys};
use std::env;
use thread_priority::{ThreadBuilder, ThreadPriority};

include!(concat!(env!("OUT_DIR"), "/generated.rs"));
Expand Down Expand Up @@ -133,7 +134,7 @@ fn startup_sync() {
let path = dir.data_path.join("data.records");

// metadata.modified gives us system time
// This may not work on all platfomns
// This may not work on all platforms
let metadata = match fs::metadata(&path) {
Ok(meta) => meta,
Err(err) => {
Expand Down
36 changes: 34 additions & 2 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::StreamExt;
use http::Uri;
use object_store::aws::AmazonS3Builder;
use object_store::limit::LimitStore;
use serde::{Deserialize, Serialize};
use std::fs;
use std::iter::Iterator;
use std::sync::Arc;
Expand All @@ -46,6 +47,20 @@ const S3_URL_ENV_VAR: &str = "P_S3_URL";
// max concurrent request allowed for datafusion object store
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;

#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ObjectStoreFormat {
#[serde(rename = "objectstore-format")]
pub version: String,
}

impl ObjectStoreFormat {
pub fn new() -> Self {
Self {
version: "v1".to_string(),
}
}
}

lazy_static::lazy_static! {
#[derive(Debug)]
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(S3Config::parse());
Expand Down Expand Up @@ -195,14 +210,29 @@ impl S3 {
Ok(())
}

async fn _create_stream(&self, stream_name: &str) -> Result<(), AwsSdkError> {
async fn _create_stream(&self, stream_name: &str, format: Vec<u8>) -> Result<(), AwsSdkError> {
// create ./schema empty file in the stream-name prefix
// this indicates that the stream has been created
// but doesn't have any content yet
let _resp = self
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.schema", stream_name))
.send()
.await?;
// create .parseable.json file in the stream-name prefix.
// This indicates the format version for this stream.
// This is helpful in case we may change the backend format
// in the future
let _resp = self
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.parseable.json", stream_name))
.body(format.into())
.send()
.await?;
// Prefix created on S3, now create the directory in
// the local storage as well
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));
Expand Down Expand Up @@ -357,7 +387,9 @@ impl ObjectStorage for S3 {
}

async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
self._create_stream(stream_name).await?;
let format = ObjectStoreFormat::new();
let body = serde_json::to_vec(&format)?;
self._create_stream(stream_name, body).await?;

Ok(())
}
Expand Down