Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
web::resource(stats_path("{logstream}"))
.route(web::get().to(handlers::logstream::get_stats)),
)
// GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
// GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
.service(web::resource(liveness_path()).route(web::get().to(handlers::liveness)))
// GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes
.service(web::resource(readiness_path()).route(web::get().to(handlers::readiness)))
Expand Down
112 changes: 92 additions & 20 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use aws_sdk_s3::RetryConfig;
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
use aws_smithy_async::rt::sleep::default_async_sleep;
use bytes::Bytes;
use chrono::Local;
use clap::builder::ArgPredicate;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand Down Expand Up @@ -58,23 +59,94 @@ const DEFAULT_S3_BUCKET: &str = "parseable";
const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin";
const DEFAULT_S3_SECRET_KEY: &str = "minioadmin";

// metadata file names in a Stream prefix
const METADATA_FILE_NAME: &str = ".metadata.json";
const SCHEMA_FILE_NAME: &str = ".schema";
const ALERT_FILE_NAME: &str = ".alert.json";

// max concurrent request allowed for datafusion object store
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;

// all the supported permissions
// const PERMISSIONS_READ: &str = "readonly";
// const PERMISSIONS_WRITE: &str = "writeonly";
// const PERMISSIONS_DELETE: &str = "delete";
// const PERMISSIONS_READ_WRITE: &str = "readwrite";
const PERMISSIONS_ALL: &str = "all";

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ObjectStoreFormat {
#[serde(rename = "objectstore-format")]
pub version: String,
#[serde(rename = "objectstore-format")]
pub objectstore_format: String,
#[serde(rename = "created-at")]
pub created_at: String,
pub owner: Owner,
pub access: Access,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Owner {
pub id: String,
pub group: String,
}

impl Owner {
pub fn new(id: String, group: String) -> Self {
Self { id, group }
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Access {
pub objects: Vec<AccessObject>,
}

impl Access {
pub fn new(objects: Vec<AccessObject>) -> Self {
Self { objects }
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AccessObject {
pub id: String,
pub group: String,
pub permissions: Vec<String>,
}

impl AccessObject {
pub fn new(id: String) -> Self {
Self {
id: id.clone(),
group: id,
permissions: vec![PERMISSIONS_ALL.to_string()],
}
}
}

impl Default for ObjectStoreFormat {
fn default() -> Self {
Self {
version: "v1".to_string(),
objectstore_format: "v1".to_string(),
created_at: Local::now().to_rfc3339(),
owner: Owner::new("".to_string(), "".to_string()),
access: Access::new(vec![]),
}
}
}

impl ObjectStoreFormat {
fn set_id(&mut self, id: String) {
self.owner.id.clone_from(&id);
self.owner.group = id;
}
fn set_access(&mut self, access: Vec<AccessObject>) {
self.access.objects = access;
}
}

lazy_static::lazy_static! {
#[derive(Debug)]
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(CONFIG.storage().clone());
Expand Down Expand Up @@ -219,7 +291,7 @@ impl S3 {
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.schema", stream_name))
.key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME))
.body(body.into_bytes().into())
.send()
.await?;
Expand All @@ -235,26 +307,22 @@ impl S3 {
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.schema", stream_name))
.key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME))
.send()
.await?;
self._put_parseable_config(stream_name, format).await?;
self._put_stream_meta(stream_name, format).await?;
// Prefix created on S3, now create the directory in
// the local storage as well
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));
Ok(())
}

async fn _put_parseable_config(
&self,
stream_name: &str,
body: Vec<u8>,
) -> Result<(), AwsSdkError> {
async fn _put_stream_meta(&self, stream_name: &str, body: Vec<u8>) -> Result<(), AwsSdkError> {
let _resp = self
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.parseable.json", stream_name))
.key(format!("{}/{}", stream_name, METADATA_FILE_NAME))
.body(body.into())
.send()
.await?;
Expand Down Expand Up @@ -297,7 +365,7 @@ impl S3 {
.client
.put_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.alert.json", stream_name))
.key(format!("{}/{}", stream_name, ALERT_FILE_NAME))
.body(body.into())
.send()
.await?;
Expand All @@ -306,23 +374,23 @@ impl S3 {
}

async fn _get_schema(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
self._get(stream_name, "schema").await
self._get(stream_name, SCHEMA_FILE_NAME).await
}

async fn _alert_exists(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
self._get(stream_name, "alert.json").await
self._get(stream_name, ALERT_FILE_NAME).await
}

async fn _get_parseable_config(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
self._get(stream_name, "parseable.json").await
async fn _get_stream_meta(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
self._get(stream_name, METADATA_FILE_NAME).await
}

async fn _get(&self, stream_name: &str, resource: &str) -> Result<Bytes, AwsSdkError> {
let resp = self
.client
.get_object()
.bucket(&S3_CONFIG.s3_bucket_name)
.key(format!("{}/.{}", stream_name, resource))
.key(format!("{}/{}", stream_name, resource))
.send()
.await?;
let body = resp.body.collect().await;
Expand Down Expand Up @@ -414,7 +482,11 @@ impl ObjectStorage for S3 {
}

async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
let format = ObjectStoreFormat::default();
let mut format = ObjectStoreFormat::default();
format.set_id(CONFIG.parseable.username.clone());
let access_object = AccessObject::new(CONFIG.parseable.username.clone());
format.set_access(vec![access_object]);

let body = serde_json::to_vec(&format)?;
self._create_stream(stream_name, body).await?;

Expand All @@ -440,13 +512,13 @@ impl ObjectStorage for S3 {

async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> {
let stats = serde_json::to_value(stats).expect("stats are perfectly serializable");
let parseable_metadata = self._get_parseable_config(stream_name).await?;
let parseable_metadata = self._get_stream_meta(stream_name).await?;
let mut parseable_metadata: Value =
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");

parseable_metadata["stats"] = stats;

self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes())
self._put_stream_meta(stream_name, parseable_metadata.to_string().into_bytes())
.await?;
Ok(())
}
Expand All @@ -470,7 +542,7 @@ impl ObjectStorage for S3 {
}

async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
let parseable_metadata = self._get_parseable_config(stream_name).await?;
let parseable_metadata = self._get_stream_meta(stream_name).await?;
let parseable_metadata: Value =
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");

Expand Down