diff --git a/README.md b/README.md index 8d852c6e6..16f612d94 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@

+ commits activity monthly join slack Github stars diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 48bcc8188..b5c8b637d 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -166,7 +166,6 @@ pub async fn put(req: HttpRequest) -> HttpResponse { // Proceed to create log stream if it doesn't exist if s3.get_schema(&stream_name).await.is_err() { - metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); // Fail if unable to create log stream on object store backend if let Err(e) = s3.create_stream(&stream_name).await { // delete the stream from metadata because we couldn't create it on object store backend @@ -180,6 +179,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse { } .to_http(); } + metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); return response::ServerResponse { msg: format!("created log stream {}", stream_name), code: StatusCode::OK, diff --git a/server/src/main.rs b/server/src/main.rs index a5830375c..9fd363ff3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -28,6 +28,7 @@ use filetime::FileTime; use log::warn; use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys}; +use std::env; use thread_priority::{ThreadBuilder, ThreadPriority}; include!(concat!(env!("OUT_DIR"), "/generated.rs")); @@ -133,7 +134,7 @@ fn startup_sync() { let path = dir.data_path.join("data.records"); // metadata.modified gives us system time - // This may not work on all platfomns + // This may not work on all platforms let metadata = match fs::metadata(&path) { Ok(meta) => meta, Err(err) => { diff --git a/server/src/s3.rs b/server/src/s3.rs index 2708d6495..5c4cd1fa2 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -22,6 +22,7 @@ use futures::StreamExt; use http::Uri; use object_store::aws::AmazonS3Builder; use object_store::limit::LimitStore; +use serde::{Deserialize, Serialize}; use std::fs; use std::iter::Iterator; use std::sync::Arc; @@ -46,6 +47,20 @@ const S3_URL_ENV_VAR: &str = "P_S3_URL"; // max concurrent request allowed for datafusion object store const MAX_OBJECT_STORE_REQUESTS: usize = 1000; +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ObjectStoreFormat { + #[serde(rename = "objectstore-format")] + pub version: String, +} + +impl ObjectStoreFormat { + pub fn new() -> Self { + Self { + version: "v1".to_string(), + } + } +} + lazy_static::lazy_static! { #[derive(Debug)] pub static ref S3_CONFIG: Arc = Arc::new(S3Config::parse()); @@ -195,7 +210,10 @@ impl S3 { Ok(()) } - async fn _create_stream(&self, stream_name: &str) -> Result<(), AwsSdkError> { + async fn _create_stream(&self, stream_name: &str, format: Vec) -> Result<(), AwsSdkError> { + // create ./schema empty file in the stream-name prefix + // this indicates that the stream has been created + // but doesn't have any content yet let _resp = self .client .put_object() @@ -203,6 +221,18 @@ impl S3 { .key(format!("{}/.schema", stream_name)) .send() .await?; + // create .parseable.json file in the stream-name prefix. + // This indicates the format version for this stream. + // This is helpful in case we may change the backend format + // in the future + let _resp = self + .client + .put_object() + .bucket(&S3_CONFIG.s3_bucket_name) + .key(format!("{}/.parseable.json", stream_name)) + .body(format.into()) + .send() + .await?; // Prefix created on S3, now create the directory in // the local storage as well let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); @@ -357,7 +387,9 @@ impl ObjectStorage for S3 { } async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { - self._create_stream(stream_name).await?; + let format = ObjectStoreFormat::new(); + let body = serde_json::to_vec(&format)?; + self._create_stream(stream_name, body).await?; Ok(()) }