From bdb7edcf4214912be2ac6b016c497e71c38511bf Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Wed, 14 Dec 2022 21:46:09 +0530 Subject: [PATCH 1/4] Temp --- server/src/main.rs | 2 +- server/src/s3.rs | 108 ++++++++++++++++++++++++++++++++++++++------- 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index b0c8aeacf..d1870c824 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -293,7 +293,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::resource(stats_path("{logstream}")) .route(web::get().to(handlers::logstream::get_stats)), ) - // GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command + // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command .service(web::resource(liveness_path()).route(web::get().to(handlers::liveness))) // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes .service(web::resource(readiness_path()).route(web::get().to(handlers::readiness))) diff --git a/server/src/s3.rs b/server/src/s3.rs index ca732d318..f1746a197 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -26,6 +26,7 @@ use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; use aws_smithy_async::rt::sleep::default_async_sleep; use bytes::Bytes; use clap::builder::ArgPredicate; +use chrono::Local; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ @@ -58,23 +59,94 @@ const DEFAULT_S3_BUCKET: &str = "parseable"; const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin"; const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; +// metadata file names in a Stream prefix +const METADATA_FILE_NAME: &str = ".metadata.json"; +const SCHEMA_FILE_NAME: &str = ".schema"; +const ALERT_FILE_NAME: &str = ".alert.json"; + // max concurrent request allowed for datafusion object store const MAX_OBJECT_STORE_REQUESTS: usize = 1000; +// all the supported permissions +// const PERMISSIONS_READ: &str = "readonly"; +// const PERMISSIONS_WRITE: &str = "writeonly"; +// const PERMISSIONS_DELETE: &str = "delete"; +// const PERMISSIONS_READ_WRITE: &str = "readwrite"; +const PERMISSIONS_ALL: &str = "all"; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectStoreFormat { - #[serde(rename = "objectstore-format")] pub version: String, + #[serde(rename = "objectstore-format")] + pub objectstore_format: String, + #[serde(rename = "created-at")] + pub created_at: String, + pub owner: Owner, + pub access: Access +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Owner { + pub id: String, + pub group: String, +} + +impl Owner { + pub fn new(id: String, group: String) -> Self { + Self { id, group } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Access { + pub objects: Vec, +} + +impl Access { + pub fn new(objects: Vec) -> Self { + Self { objects } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AccessObject { + pub id: String, + pub group: String, + pub permissions: Vec, +} + +impl AccessObject { + pub fn new(id: String) -> Self { + Self { + id: id.clone(), + group: id, + permissions: vec![PERMISSIONS_ALL.to_string()], + } + } } impl Default for ObjectStoreFormat { fn default() -> Self { Self { version: "v1".to_string(), + objectstore_format: "v1".to_string(), + created_at: Local::now().to_rfc3339(), + owner: Owner::new("".to_string(), "".to_string()), + access: Access::new(vec![]), } } } +impl ObjectStoreFormat { + fn set_id(&mut self, id: String) { + self.owner.id = id.clone(); + self.owner.group = id; + } + fn set_access(&mut self, access: Vec) { + self.access.objects = access; + } +} + lazy_static::lazy_static! { #[derive(Debug)] pub static ref S3_CONFIG: Arc = Arc::new(CONFIG.storage().clone()); @@ -219,7 +291,7 @@ impl S3 { .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.schema", stream_name)) + .key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME)) .body(body.into_bytes().into()) .send() .await?; @@ -235,17 +307,17 @@ impl S3 { .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.schema", stream_name)) + .key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME)) .send() .await?; - self._put_parseable_config(stream_name, format).await?; + self._put_stream_meta(stream_name, format).await?; // Prefix created on S3, now create the directory in // the local storage as well let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); Ok(()) } - async fn _put_parseable_config( + async fn _put_stream_meta( &self, stream_name: &str, body: Vec, @@ -254,7 +326,7 @@ impl S3 { .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.parseable.json", stream_name)) + .key(format!("{}/{}", stream_name, METADATA_FILE_NAME)) .body(body.into()) .send() .await?; @@ -297,7 +369,7 @@ impl S3 { .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.alert.json", stream_name)) + .key(format!("{}/{}", stream_name, ALERT_FILE_NAME)) .body(body.into()) .send() .await?; @@ -306,15 +378,15 @@ impl S3 { } async fn _get_schema(&self, stream_name: &str) -> Result { - self._get(stream_name, "schema").await + self._get(stream_name, SCHEMA_FILE_NAME).await } async fn _alert_exists(&self, stream_name: &str) -> Result { - self._get(stream_name, "alert.json").await + self._get(stream_name, ALERT_FILE_NAME).await } - async fn _get_parseable_config(&self, stream_name: &str) -> Result { - self._get(stream_name, "parseable.json").await + async fn _get_stream_meta(&self, stream_name: &str) -> Result { + self._get(stream_name, METADATA_FILE_NAME).await } async fn _get(&self, stream_name: &str, resource: &str) -> Result { @@ -322,7 +394,7 @@ impl S3 { .client .get_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.{}", stream_name, resource)) + .key(format!("{}/{}", stream_name, resource)) .send() .await?; let body = resp.body.collect().await; @@ -414,7 +486,11 @@ impl ObjectStorage for S3 { } async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { - let format = ObjectStoreFormat::default(); + let mut format = ObjectStoreFormat::default(); + format.set_id(CONFIG.parseable.username.clone()); + let access_object = AccessObject::new(CONFIG.parseable.username.clone()); + format.set_access(vec![access_object]); + let body = serde_json::to_vec(&format)?; self._create_stream(stream_name, body).await?; @@ -440,13 +516,13 @@ impl ObjectStorage for S3 { async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); - let parseable_metadata = self._get_parseable_config(stream_name).await?; + let parseable_metadata = self._get_stream_meta(stream_name).await?; let mut parseable_metadata: Value = serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); parseable_metadata["stats"] = stats; - self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes()) + self._put_stream_meta(stream_name, parseable_metadata.to_string().into_bytes()) .await?; Ok(()) } @@ -470,7 +546,7 @@ impl ObjectStorage for S3 { } async fn get_stats(&self, stream_name: &str) -> Result { - let parseable_metadata = self._get_parseable_config(stream_name).await?; + let parseable_metadata = self._get_stream_meta(stream_name).await?; let parseable_metadata: Value = serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); From 5d06a69d462e856df3df3976eb797d48063fbef8 Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Wed, 14 Dec 2022 22:05:58 +0530 Subject: [PATCH 2/4] temp --- server/src/s3.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/s3.rs b/server/src/s3.rs index f1746a197..f159cbb3c 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -25,8 +25,8 @@ use aws_sdk_s3::RetryConfig; use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; use aws_smithy_async::rt::sleep::default_async_sleep; use bytes::Bytes; -use clap::builder::ArgPredicate; use chrono::Local; +use clap::builder::ArgPredicate; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ @@ -82,7 +82,7 @@ pub struct ObjectStoreFormat { #[serde(rename = "created-at")] pub created_at: String, pub owner: Owner, - pub access: Access + pub access: Access, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -317,11 +317,7 @@ impl S3 { Ok(()) } - async fn _put_stream_meta( - &self, - stream_name: &str, - body: Vec, - ) -> Result<(), AwsSdkError> { + async fn _put_stream_meta(&self, stream_name: &str, body: Vec) -> Result<(), AwsSdkError> { let _resp = self .client .put_object() From 99b5bf349c42c5a2bd8626267a7bced369f43a2e Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Wed, 14 Dec 2022 22:07:31 +0530 Subject: [PATCH 3/4] Fix deepsource --- server/src/s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/s3.rs b/server/src/s3.rs index f159cbb3c..6f9806a3a 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -139,7 +139,7 @@ impl Default for ObjectStoreFormat { impl ObjectStoreFormat { fn set_id(&mut self, id: String) { - self.owner.id = id.clone(); + self.owner.id = id.clone_from()(); self.owner.group = id; } fn set_access(&mut self, access: Vec) { From 4b783a3180cf47d6ae6736fb063406cce0d3b375 Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Wed, 14 Dec 2022 22:30:02 +0530 Subject: [PATCH 4/4] Fix --- server/src/s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/s3.rs b/server/src/s3.rs index 6f9806a3a..f987be2da 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -139,7 +139,7 @@ impl Default for ObjectStoreFormat { impl ObjectStoreFormat { fn set_id(&mut self, id: String) { - self.owner.id = id.clone_from()(); + self.owner.id.clone_from(&id); self.owner.group = id; } fn set_access(&mut self, access: Vec) {