diff --git a/server/src/main.rs b/server/src/main.rs index b0c8aeacf..d1870c824 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -293,7 +293,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::resource(stats_path("{logstream}")) .route(web::get().to(handlers::logstream::get_stats)), ) - // GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command + // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command .service(web::resource(liveness_path()).route(web::get().to(handlers::liveness))) // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes .service(web::resource(readiness_path()).route(web::get().to(handlers::readiness))) diff --git a/server/src/s3.rs b/server/src/s3.rs index ca732d318..f987be2da 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -25,6 +25,7 @@ use aws_sdk_s3::RetryConfig; use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; use aws_smithy_async::rt::sleep::default_async_sleep; use bytes::Bytes; +use chrono::Local; use clap::builder::ArgPredicate; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -58,23 +59,94 @@ const DEFAULT_S3_BUCKET: &str = "parseable"; const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin"; const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; +// metadata file names in a Stream prefix +const METADATA_FILE_NAME: &str = ".metadata.json"; +const SCHEMA_FILE_NAME: &str = ".schema"; +const ALERT_FILE_NAME: &str = ".alert.json"; + // max concurrent request allowed for datafusion object store const MAX_OBJECT_STORE_REQUESTS: usize = 1000; +// all the supported permissions +// const PERMISSIONS_READ: &str = "readonly"; +// const PERMISSIONS_WRITE: &str = "writeonly"; +// const PERMISSIONS_DELETE: &str = "delete"; +// const PERMISSIONS_READ_WRITE: &str = "readwrite"; +const PERMISSIONS_ALL: &str = "all"; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectStoreFormat { - #[serde(rename = "objectstore-format")] pub version: String, + #[serde(rename = "objectstore-format")] + pub objectstore_format: String, + #[serde(rename = "created-at")] + pub created_at: String, + pub owner: Owner, + pub access: Access, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Owner { + pub id: String, + pub group: String, +} + +impl Owner { + pub fn new(id: String, group: String) -> Self { + Self { id, group } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Access { + pub objects: Vec, +} + +impl Access { + pub fn new(objects: Vec) -> Self { + Self { objects } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AccessObject { + pub id: String, + pub group: String, + pub permissions: Vec, +} + +impl AccessObject { + pub fn new(id: String) -> Self { + Self { + id: id.clone(), + group: id, + permissions: vec![PERMISSIONS_ALL.to_string()], + } + } } impl Default for ObjectStoreFormat { fn default() -> Self { Self { version: "v1".to_string(), + objectstore_format: "v1".to_string(), + created_at: Local::now().to_rfc3339(), + owner: Owner::new("".to_string(), "".to_string()), + access: Access::new(vec![]), } } } +impl ObjectStoreFormat { + fn set_id(&mut self, id: String) { + self.owner.id.clone_from(&id); + self.owner.group = id; + } + fn set_access(&mut self, access: Vec) { + self.access.objects = access; + } +} + lazy_static::lazy_static! { #[derive(Debug)] pub static ref S3_CONFIG: Arc = Arc::new(CONFIG.storage().clone()); @@ -219,7 +291,7 @@ impl S3 { .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.schema", stream_name)) + .key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME)) .body(body.into_bytes().into()) .send() .await?; @@ -235,26 +307,22 @@ impl S3 { .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.schema", stream_name)) + .key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME)) .send() .await?; - self._put_parseable_config(stream_name, format).await?; + self._put_stream_meta(stream_name, format).await?; // Prefix created on S3, now create the directory in // the local storage as well let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); Ok(()) } - async fn _put_parseable_config( - &self, - stream_name: &str, - body: Vec, - ) -> Result<(), AwsSdkError> { + async fn _put_stream_meta(&self, stream_name: &str, body: Vec) -> Result<(), AwsSdkError> { let _resp = self .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.parseable.json", stream_name)) + .key(format!("{}/{}", stream_name, METADATA_FILE_NAME)) .body(body.into()) .send() .await?; @@ -297,7 +365,7 @@ impl S3 { .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.alert.json", stream_name)) + .key(format!("{}/{}", stream_name, ALERT_FILE_NAME)) .body(body.into()) .send() .await?; @@ -306,15 +374,15 @@ impl S3 { } async fn _get_schema(&self, stream_name: &str) -> Result { - self._get(stream_name, "schema").await + self._get(stream_name, SCHEMA_FILE_NAME).await } async fn _alert_exists(&self, stream_name: &str) -> Result { - self._get(stream_name, "alert.json").await + self._get(stream_name, ALERT_FILE_NAME).await } - async fn _get_parseable_config(&self, stream_name: &str) -> Result { - self._get(stream_name, "parseable.json").await + async fn _get_stream_meta(&self, stream_name: &str) -> Result { + self._get(stream_name, METADATA_FILE_NAME).await } async fn _get(&self, stream_name: &str, resource: &str) -> Result { @@ -322,7 +390,7 @@ impl S3 { .client .get_object() .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/.{}", stream_name, resource)) + .key(format!("{}/{}", stream_name, resource)) .send() .await?; let body = resp.body.collect().await; @@ -414,7 +482,11 @@ impl ObjectStorage for S3 { } async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { - let format = ObjectStoreFormat::default(); + let mut format = ObjectStoreFormat::default(); + format.set_id(CONFIG.parseable.username.clone()); + let access_object = AccessObject::new(CONFIG.parseable.username.clone()); + format.set_access(vec![access_object]); + let body = serde_json::to_vec(&format)?; self._create_stream(stream_name, body).await?; @@ -440,13 +512,13 @@ impl ObjectStorage for S3 { async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); - let parseable_metadata = self._get_parseable_config(stream_name).await?; + let parseable_metadata = self._get_stream_meta(stream_name).await?; let mut parseable_metadata: Value = serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); parseable_metadata["stats"] = stats; - self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes()) + self._put_stream_meta(stream_name, parseable_metadata.to_string().into_bytes()) .await?; Ok(()) } @@ -470,7 +542,7 @@ impl ObjectStorage for S3 { } async fn get_stats(&self, stream_name: &str) -> Result { - let parseable_metadata = self._get_parseable_config(stream_name).await?; + let parseable_metadata = self._get_stream_meta(stream_name).await?; let parseable_metadata: Value = serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");