From 6244babfde911dd1222b0a3ed0bec231c9b2fa0b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 15 Dec 2022 14:27:27 +0530 Subject: [PATCH] Add option to use local filesystem for storage This PR enables Parseable server to accept additional arguments ( drive or s3) to switch between storage. Local filesystem behaves like an object storage through implementation of ObjectStorage trait. All methods of LocalFS are implemented using tokio::fs Configuration provided on runtime is used to select a storage provider. The storage object is handled as trait objects hence the requirement of ?Sized on some methods. Calls to S3 are replaced with appropriate methods. --- server/Cargo.toml | 5 +- server/src/event.rs | 14 +- server/src/handlers/event.rs | 7 +- server/src/handlers/logstream.rs | 44 +- server/src/handlers/mod.rs | 5 +- server/src/main.rs | 30 +- server/src/metadata.rs | 4 +- server/src/option.rs | 105 ++--- server/src/query.rs | 11 +- server/src/query/table_provider.rs | 16 +- server/src/s3.rs | 636 --------------------------- server/src/storage.rs | 358 +++++---------- server/src/storage/file_link.rs | 103 +++++ server/src/storage/localfs.rs | 193 ++++++++ server/src/storage/object_storage.rs | 275 ++++++++++++ server/src/storage/s3.rs | 369 ++++++++++++++++ 16 files changed, 1192 insertions(+), 983 deletions(-) delete mode 100644 server/src/s3.rs create mode 100644 server/src/storage/file_link.rs create mode 100644 server/src/storage/localfs.rs create mode 100644 server/src/storage/object_storage.rs create mode 100644 server/src/storage/s3.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index 0cf6f78fa..1689ef34c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -25,6 +25,7 @@ object_store = { version = "0.5.1", features = ["aws"] } derive_more = "0.99.17" env_logger = "0.9.0" futures = "0.3" +fs_extra = "1.2.0" http = "0.2.4" humantime-serde = "1.1.1" lazy_static = "1.4.0" @@ -33,6 +34,7 @@ num_cpus = "1.0.0" os_info = "3.0.7" hostname = "0.3" rand = "0.8.4" +relative-path = "1.7.2" rustls = "0.20.6" rustls-pemfile = "1.0.1" rust-flatten-json = "0.2.0" @@ -43,10 +45,11 @@ serde_json = "^1.0.8" sysinfo = "0.26.4" thiserror = "1" thread-priority = "0.9.2" -tokio-stream = "0.1.8" +tokio-stream = { version = "0.1.8", features = ["fs"] } tokio = { version = "1.13.1", default-features = false, features = [ "sync", "macros", + "fs", ] } clokwerk = "0.4.0-rc1" actix-web-static-files = "4.0" diff --git a/server/src/event.rs b/server/src/event.rs index b1e8037df..661e8e133 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -35,8 +35,8 @@ use std::sync::RwLock; use crate::metadata; use crate::metadata::LOCK_EXPECT; -use crate::s3; -use crate::storage::{ObjectStorage, StorageDir}; +use crate::option::CONFIG; +use crate::storage::{ObjectStorageProvider, StorageDir}; use self::error::{EventError, StreamWriterError}; @@ -184,7 +184,7 @@ impl Event { } else { // if stream schema is none then it is first event, // process first event and store schema in obect store - self.process_first_event::(event, inferred_schema)? + self.process_first_event(event, inferred_schema)? }; metadata::STREAM_INFO.update_stats( @@ -202,7 +202,7 @@ impl Event { // This is called when the first event of a log stream is received. The first event is // special because we parse this event to generate the schema for the log stream. This // schema is then enforced on rest of the events sent to this log stream. - fn process_first_event( + fn process_first_event( &self, event: json::Reader, schema: Schema, @@ -241,13 +241,13 @@ impl Event { "setting schema on objectstore for logstream {}", stream_name ); - let storage = S::new(); + let storage = CONFIG.storage().get_object_store(); let stream_name = stream_name.clone(); spawn(async move { - if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await { + if let Err(e) = storage.put_schema(&stream_name, &schema).await { // If this call has failed then currently there is no right way to make local state consistent - // this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync. + // this needs a fix after more constraints are safety guarentee is provided by localwriter and objectstore_sync. // Reasoning - // - After dropping lock many events may process through // - Processed events may sync before metadata deletion diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index c44950a0f..8346247e1 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -22,9 +22,10 @@ use actix_web::{web, HttpRequest, HttpResponse}; use serde_json::Value; use crate::event; +use crate::option::CONFIG; use crate::query::Query; use crate::response::QueryResponse; -use crate::s3::S3; +use crate::storage::ObjectStorageProvider; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::{self, flatten_json_body, merge}; @@ -39,9 +40,9 @@ pub async fn query(_req: HttpRequest, json: web::Json) -> Result::into) diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 59bfce4c4..cebf77865 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -24,8 +24,8 @@ use chrono::Utc; use serde_json::Value; use crate::alerts::Alerts; -use crate::s3::S3; -use crate::storage::{ObjectStorage, StorageDir}; +use crate::option::CONFIG; +use crate::storage::{ObjectStorageProvider, StorageDir}; use crate::{event, response}; use crate::{metadata, validator}; @@ -40,9 +40,9 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { .to_http(); } - let s3 = S3::new(); + let objectstore = CONFIG.storage().get_object_store(); - if s3.get_schema(&stream_name).await.is_err() { + if objectstore.get_schema(&stream_name).await.is_err() { return response::ServerResponse { msg: format!("log stream {} does not exist", stream_name), code: StatusCode::BAD_REQUEST, @@ -50,7 +50,7 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { .to_http(); } - if let Err(e) = s3.delete_stream(&stream_name).await { + if let Err(e) = objectstore.delete_stream(&stream_name).await { return response::ServerResponse { msg: format!( "failed to delete log stream {} due to err: {}", @@ -87,7 +87,14 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { } pub async fn list(_: HttpRequest) -> impl Responder { - response::list_response(S3::new().list_streams().await.unwrap()) + response::list_response( + CONFIG + .storage() + .get_object_store() + .list_streams() + .await + .unwrap(), + ) } pub async fn schema(req: HttpRequest) -> HttpResponse { @@ -101,7 +108,12 @@ pub async fn schema(req: HttpRequest) -> HttpResponse { code: StatusCode::OK, } .to_http(), - Err(_) => match S3::new().get_schema(&stream_name).await { + Err(_) => match CONFIG + .storage() + .get_object_store() + .get_schema(&stream_name) + .await + { Ok(None) => response::ServerResponse { msg: "log stream is not initialized, please post an event before fetching schema" .to_string(), @@ -136,7 +148,12 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse { let mut alerts = match alerts { Some(alerts) => alerts, - None => match S3::new().get_alerts(&stream_name).await { + None => match CONFIG + .storage() + .get_object_store() + .get_alerts(&stream_name) + .await + { Ok(alerts) if alerts.alerts.is_empty() => { return response::ServerResponse { msg: "alert configuration not set for log stream {}".to_string(), @@ -233,7 +250,12 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> } } - if let Err(e) = S3::new().put_alerts(&stream_name, &alerts).await { + if let Err(e) = CONFIG + .storage() + .get_object_store() + .put_alerts(&stream_name, &alerts) + .await + { return response::ServerResponse { msg: format!( "failed to set alert configuration for log stream {} due to err: {}", @@ -333,8 +355,8 @@ pub async fn create_stream_if_not_exists(stream_name: String) -> HttpResponse { } // Proceed to create log stream if it doesn't exist - let s3 = S3::new(); - if let Err(e) = s3.create_stream(&stream_name).await { + let storage = CONFIG.storage().get_object_store(); + if let Err(e) = storage.create_stream(&stream_name).await { // Fail if unable to create log stream on object store backend response::ServerResponse { msg: format!( diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index 5c02205ab..217ad8c2f 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -23,8 +23,7 @@ use actix_web::http::StatusCode; use actix_web::HttpResponse; use sysinfo::{System, SystemExt}; -use crate::s3::S3; -use crate::storage::ObjectStorage; +use crate::{option::CONFIG, storage::ObjectStorageProvider}; pub async fn liveness() -> HttpResponse { // If the available memory is less than 100MiB, return a 503 error. @@ -37,7 +36,7 @@ pub async fn liveness() -> HttpResponse { } pub async fn readiness() -> HttpResponse { - if let Ok(()) = S3::new().check().await { + if CONFIG.storage().get_object_store().check().await.is_ok() { return HttpResponse::new(StatusCode::OK); } diff --git a/server/src/main.rs b/server/src/main.rs index d1870c824..40e47ed4f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -47,15 +47,14 @@ mod metadata; mod option; mod query; mod response; -mod s3; mod stats; mod storage; mod utils; mod validator; use option::CONFIG; -use s3::S3; -use storage::ObjectStorage; + +use crate::storage::ObjectStorageProvider; // Global configurations const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000; @@ -67,16 +66,17 @@ async fn main() -> anyhow::Result<()> { env_logger::init(); CONFIG.print(); CONFIG.validate(); - let storage = S3::new(); - CONFIG.validate_storage(&storage).await; - if let Err(e) = metadata::STREAM_INFO.load(&storage).await { + let storage = CONFIG.storage().get_object_store(); + CONFIG.validate_storage(&*storage).await; + if let Err(e) = metadata::STREAM_INFO.load(&*storage).await { warn!("could not populate local metadata. {:?}", e); } // track all parquet files already in the data directory storage::CACHED_FILES.track_parquet(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); - let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync(); + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + object_store_sync(); let app = run_http(); tokio::pin!(app); @@ -84,10 +84,10 @@ async fn main() -> anyhow::Result<()> { tokio::select! { e = &mut app => { // actix server finished .. stop other threads and stop the server - s3sync_inbox.send(()).unwrap_or(()); + remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); localsync_handler.join().unwrap_or(()); - s3sync_handler.join().unwrap_or(()); + remote_sync_handler.join().unwrap_or(()); return e }, _ = &mut localsync_outbox => { @@ -95,16 +95,16 @@ async fn main() -> anyhow::Result<()> { // panic!("Local Sync thread died. Server will fail now!") return Err(anyhow::Error::msg("Failed to sync local data to disc. This can happen due to critical error in disc or environment. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) }, - _ = &mut s3sync_outbox => { - // s3sync failed, this is recoverable by just starting s3sync thread again - s3sync_handler.join().unwrap_or(()); - (s3sync_handler, s3sync_outbox, s3sync_inbox) = s3_sync(); + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + remote_sync_handler.join().unwrap_or(()); + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync(); } }; } } -fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { +fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let mut inbox_rx = AssertUnwindSafe(inbox_rx); @@ -116,7 +116,7 @@ fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { scheduler .every((CONFIG.parseable.upload_interval as u32).seconds()) .run(|| async { - if let Err(e) = S3::new().s3_sync().await { + if let Err(e) = CONFIG.storage().get_object_store().sync().await { warn!("failed to sync local data with object store. {:?}", e); } }); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 0a39bfa22..8bedf5d20 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -116,8 +116,8 @@ impl STREAM_INFO { map.remove(stream_name); } - pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), LoadError> { - // When loading streams this function will assume list_streams only returns valid streams. + pub async fn load(&self, storage: &(impl ObjectStorage + ?Sized)) -> Result<(), LoadError> { + // When loading streams this funtion will assume list_streams only returns valid streams. // a valid stream would have a .schema file. // .schema file could be empty in that case it will be treated as an uninitialized stream. // return error in case of an error from object storage itself. diff --git a/server/src/option.rs b/server/src/option.rs index 314d83c68..067c6417c 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -17,18 +17,20 @@ */ use clap::builder::ArgPredicate; -use clap::Parser; +use clap::{Parser, Subcommand}; use crossterm::style::Stylize; use std::path::PathBuf; use std::sync::Arc; use crate::banner; -use crate::s3::S3Config; -use crate::storage::{ObjectStorage, ObjectStorageError, LOCAL_SYNC_INTERVAL}; +use crate::storage::{ + FSConfig, ObjectStorage, ObjectStorageError, ObjectStorageProvider, S3Config, + LOCAL_SYNC_INTERVAL, +}; lazy_static::lazy_static! { #[derive(Debug)] - pub static ref CONFIG: Arc> = Arc::new(Config::new()); + pub static ref CONFIG: Arc = Arc::new(Config::new()); } pub const USERNAME_ENV: &str = "P_USERNAME"; @@ -36,24 +38,13 @@ pub const PASSWORD_ENV: &str = "P_PASSWORD"; pub const DEFAULT_USERNAME: &str = "parseable"; pub const DEFAULT_PASSWORD: &str = "parseable"; -pub trait StorageOpt: Sync + Send { - fn bucket_name(&self) -> &str; - fn endpoint_url(&self) -> &str; +pub struct Config { + pub parseable: Server, } -pub struct Config -where - S: Clone + clap::Args + StorageOpt, -{ - pub parseable: Server, -} - -impl Config -where - S: Clone + clap::Args + StorageOpt, -{ +impl Config { fn new() -> Self { - let Cli::Server::(args) = match Cli::::try_parse() { + let Cli::Server(args) = match Cli::try_parse() { Ok(s) => s, Err(e) => { e.exit(); @@ -62,10 +53,6 @@ where Config { parseable: args } } - pub fn storage(&self) -> &S { - &self.parseable.objectstore_config - } - pub fn print(&self) { let scheme = CONFIG.parseable.get_scheme(); self.status_info(&scheme); @@ -82,24 +69,21 @@ where } } - pub async fn validate_storage(&self, storage: &impl ObjectStorage) { + pub async fn validate_storage(&self, storage: &(impl ObjectStorage + ?Sized)) { match storage.check().await { Ok(_) => (), - Err(ObjectStorageError::NoSuchBucket(name)) => panic!( - "Could not start because the bucket doesn't exist. Please ensure bucket {bucket} exists on {url}", - bucket = name, - url = self.storage().endpoint_url() - ), Err(ObjectStorageError::ConnectionError(inner)) => panic!( "Failed to connect to the Object Storage Service on {url}\nCaused by: {cause}", - url = self.storage().endpoint_url(), + url = self.storage().get_endpoint(), cause = inner ), Err(ObjectStorageError::AuthenticationError(inner)) => panic!( "Failed to authenticate. Please ensure credentials are valid\n Caused by: {cause}", cause = inner ), - Err(error) => { panic!("{error}") } + Err(error) => { + panic!("{error}") + } } } @@ -121,11 +105,10 @@ where " {} Local Data Path: {} - Object Storage: {}/{}", + Object Storage: {}", "Storage:".to_string().blue().bold(), self.parseable.local_disk_path.to_string_lossy(), - self.storage().endpoint_url(), - self.storage().bucket_name() + self.parseable.object_store.get_endpoint(), ) } @@ -145,6 +128,10 @@ where fn is_demo(&self) -> bool { self.parseable.demo } + + pub fn storage(&self) -> &impl ObjectStorageProvider { + &self.parseable.object_store + } } #[derive(Parser)] // requires `derive` feature @@ -154,19 +141,13 @@ where about = "Parseable is a log storage and observability platform.", version )] -enum Cli -where - S: Clone + clap::Args + StorageOpt, -{ - Server(Server), +enum Cli { + Server(Server), } #[derive(clap::Args, Debug, Clone)] #[clap(name = "server", about = "Start the Parseable server")] -pub struct Server -where - S: Clone + clap::Args + StorageOpt, -{ +pub struct Server { /// The location of TLS Cert file #[arg( long, @@ -233,18 +214,44 @@ where )] pub password: String, - #[clap(flatten)] - pub objectstore_config: S, + #[command(subcommand)] + pub object_store: ObjectStore, /// Run Parseable in demo mode with default credentials and open object store #[arg(short, long, exclusive = true)] pub demo: bool, } -impl Server -where - S: Clone + clap::Args + StorageOpt, -{ +#[derive(Debug, Clone, Subcommand)] +pub enum ObjectStore { + Drive(FSConfig), + S3(S3Config), +} + +impl ObjectStorageProvider for ObjectStore { + fn get_datafusion_runtime(&self) -> Arc { + match self { + ObjectStore::Drive(x) => x.get_datafusion_runtime(), + ObjectStore::S3(x) => x.get_datafusion_runtime(), + } + } + + fn get_object_store(&self) -> Arc { + match self { + ObjectStore::Drive(x) => x.get_object_store(), + ObjectStore::S3(x) => x.get_object_store(), + } + } + + fn get_endpoint(&self) -> String { + match self { + ObjectStore::Drive(x) => x.get_endpoint(), + ObjectStore::S3(x) => x.get_endpoint(), + } + } +} + +impl Server { pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_disk_path.join(stream_name) } diff --git a/server/src/query.rs b/server/src/query.rs index 2a2a235d8..e6a092fc7 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -29,10 +29,11 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use crate::storage; +use crate::option::CONFIG; use crate::storage::ObjectStorage; use crate::storage::ObjectStorageError; use crate::storage::StorageDir; +use crate::storage::{self, ObjectStorageProvider}; use crate::utils::TimePeriod; use crate::validator; @@ -75,7 +76,7 @@ impl Query { /// TODO: find a way to query all selected parquet files together in a single context. pub async fn execute( &self, - storage: &impl ObjectStorage, + storage: &(impl ObjectStorage + ?Sized), ) -> Result, ExecuteError> { let dir = StorageDir::new(&self.stream_name); @@ -98,8 +99,10 @@ impl Query { let parquet_files: Vec = possible_parquet_files.chain(parquet_files).collect(); - let ctx = - SessionContext::with_config_rt(SessionConfig::default(), storage.query_runtime_env()); + let ctx = SessionContext::with_config_rt( + SessionConfig::default(), + CONFIG.storage().get_datafusion_runtime(), + ); let table = Arc::new(QueryTableProvider::new( arrow_files, parquet_files, diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 44e94acde..7af93acaf 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -40,7 +40,7 @@ use std::sync::Arc; pub struct QueryTableProvider { arrow_files: Vec, parquet_files: Vec, - storage: ListingTable, + storage: Option, schema: Arc, } @@ -48,7 +48,7 @@ impl QueryTableProvider { pub fn new( arrow_files: Vec, parquet_files: Vec, - storage: ListingTable, + storage: Option, schema: Arc, ) -> Self { // By the time this query executes the arrow files could be converted to parquet files @@ -97,9 +97,17 @@ impl QueryTableProvider { let listexec = listtable.scan(ctx, projection, filters, limit).await?; Arc::new(UnionExec::new(vec![memexec, listexec])) }; - let storage_exec = self.storage.scan(ctx, projection, filters, limit).await?; - Ok(Arc::new(UnionExec::new(vec![cache_exec, storage_exec]))) + let mut exec = vec![cache_exec]; + if let Some(ref storage_listing) = self.storage { + exec.push( + storage_listing + .scan(ctx, projection, filters, limit) + .await?, + ); + } + + Ok(Arc::new(UnionExec::new(exec))) } } diff --git a/server/src/s3.rs b/server/src/s3.rs deleted file mode 100644 index f987be2da..000000000 --- a/server/src/s3.rs +++ /dev/null @@ -1,636 +0,0 @@ -/* - * Parseable Server (C) 2022 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use async_trait::async_trait; -use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind}; -use aws_sdk_s3::model::{CommonPrefix, Delete, ObjectIdentifier}; -use aws_sdk_s3::types::{ByteStream, SdkError}; -use aws_sdk_s3::Error as AwsSdkError; -use aws_sdk_s3::RetryConfig; -use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; -use aws_smithy_async::rt::sleep::default_async_sleep; -use bytes::Bytes; -use chrono::Local; -use clap::builder::ArgPredicate; -use datafusion::arrow::datatypes::Schema; -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::{ - ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, -}; -use datafusion::datasource::object_store::ObjectStoreRegistry; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; -use futures::StreamExt; -use http::Uri; -use object_store::aws::AmazonS3Builder; -use object_store::limit::LimitStore; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::fs; -use std::iter::Iterator; -use std::sync::Arc; - -use crate::alerts::Alerts; -use crate::option::{StorageOpt, CONFIG}; -use crate::query::Query; -use crate::stats::Stats; -use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; - -// Default object storage currently is DO Spaces bucket -// Any user who starts the Parseable server with default configuration -// will point to this bucket and will see any data present on this bucket -const DEFAULT_S3_URL: &str = "https://minio.parseable.io:9000"; -const DEFAULT_S3_REGION: &str = "us-east-1"; -const DEFAULT_S3_BUCKET: &str = "parseable"; -const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin"; -const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; - -// metadata file names in a Stream prefix -const METADATA_FILE_NAME: &str = ".metadata.json"; -const SCHEMA_FILE_NAME: &str = ".schema"; -const ALERT_FILE_NAME: &str = ".alert.json"; - -// max concurrent request allowed for datafusion object store -const MAX_OBJECT_STORE_REQUESTS: usize = 1000; - -// all the supported permissions -// const PERMISSIONS_READ: &str = "readonly"; -// const PERMISSIONS_WRITE: &str = "writeonly"; -// const PERMISSIONS_DELETE: &str = "delete"; -// const PERMISSIONS_READ_WRITE: &str = "readwrite"; -const PERMISSIONS_ALL: &str = "all"; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ObjectStoreFormat { - pub version: String, - #[serde(rename = "objectstore-format")] - pub objectstore_format: String, - #[serde(rename = "created-at")] - pub created_at: String, - pub owner: Owner, - pub access: Access, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Owner { - pub id: String, - pub group: String, -} - -impl Owner { - pub fn new(id: String, group: String) -> Self { - Self { id, group } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Access { - pub objects: Vec, -} - -impl Access { - pub fn new(objects: Vec) -> Self { - Self { objects } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct AccessObject { - pub id: String, - pub group: String, - pub permissions: Vec, -} - -impl AccessObject { - pub fn new(id: String) -> Self { - Self { - id: id.clone(), - group: id, - permissions: vec![PERMISSIONS_ALL.to_string()], - } - } -} - -impl Default for ObjectStoreFormat { - fn default() -> Self { - Self { - version: "v1".to_string(), - objectstore_format: "v1".to_string(), - created_at: Local::now().to_rfc3339(), - owner: Owner::new("".to_string(), "".to_string()), - access: Access::new(vec![]), - } - } -} - -impl ObjectStoreFormat { - fn set_id(&mut self, id: String) { - self.owner.id.clone_from(&id); - self.owner.group = id; - } - fn set_access(&mut self, access: Vec) { - self.access.objects = access; - } -} - -lazy_static::lazy_static! { - #[derive(Debug)] - pub static ref S3_CONFIG: Arc = Arc::new(CONFIG.storage().clone()); - - // runtime to be used in query session - pub static ref STORAGE_RUNTIME: Arc = { - - let s3 = AmazonS3Builder::new() - .with_region(&S3_CONFIG.s3_region) - .with_endpoint(&S3_CONFIG.s3_endpoint_url) - .with_bucket_name(&S3_CONFIG.s3_bucket_name) - .with_access_key_id(&S3_CONFIG.s3_access_key_id) - .with_secret_access_key(&S3_CONFIG.s3_secret_key) - // allow http for local instances - .with_allow_http(true) - .build() - .unwrap(); - - // limit objectstore to a concurrent request limit - let s3 = LimitStore::new(s3, MAX_OBJECT_STORE_REQUESTS); - - let object_store_registry = ObjectStoreRegistry::new(); - object_store_registry.register_store("s3", &S3_CONFIG.s3_bucket_name, Arc::new(s3)); - - let config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); - - let runtime = RuntimeEnv::new(config).unwrap(); - - Arc::new(runtime) - - }; -} - -#[derive(Debug, Clone, clap::Args)] -#[command(name = "S3 config", about = "configuration for AWS S3 SDK")] -pub struct S3Config { - /// The endpoint to AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_URL", - value_name = "url", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_URL) - )] - pub s3_endpoint_url: String, - - /// The access key for AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_ACCESS_KEY", - value_name = "access-key", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_ACCESS_KEY) - )] - pub s3_access_key_id: String, - - /// The secret key for AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_SECRET_KEY", - value_name = "secret-key", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_SECRET_KEY) - )] - pub s3_secret_key: String, - - /// The region for AWS S3 or compatible object storage platform - #[arg( - long, - env = "P_S3_REGION", - value_name = "region", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_REGION) - )] - pub s3_region: String, - - /// The AWS S3 or compatible object storage bucket to be used for storage - #[arg( - long, - env = "P_S3_BUCKET", - value_name = "bucket-name", - default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_BUCKET) - )] - pub s3_bucket_name: String, -} - -impl StorageOpt for S3Config { - fn bucket_name(&self) -> &str { - &self.s3_bucket_name - } - - fn endpoint_url(&self) -> &str { - &self.s3_endpoint_url - } -} - -struct S3Options { - endpoint: Endpoint, - region: Region, - creds: Credentials, -} - -impl S3Options { - fn new() -> Self { - let uri = S3_CONFIG.s3_endpoint_url.parse::().unwrap(); - let endpoint = Endpoint::immutable(uri); - let region = Region::new(&S3_CONFIG.s3_region); - let creds = Credentials::new( - &S3_CONFIG.s3_access_key_id, - &S3_CONFIG.s3_secret_key, - None, - None, - "", - ); - - Self { - endpoint, - region, - creds, - } - } -} - -pub struct S3 { - client: aws_sdk_s3::Client, -} - -impl S3 { - pub fn new() -> Self { - let options = S3Options::new(); - let config = aws_sdk_s3::Config::builder() - .region(options.region) - .endpoint_resolver(options.endpoint) - .credentials_provider(options.creds) - .retry_config(RetryConfig::standard().with_max_attempts(5)) - .sleep_impl(default_async_sleep().expect("sleep impl is provided for tokio rt")) - .build(); - - let client = Client::from_conf(config); - - Self { client } - } - - async fn _put_schema(&self, stream_name: String, body: String) -> Result<(), AwsSdkError> { - let _resp = self - .client - .put_object() - .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME)) - .body(body.into_bytes().into()) - .send() - .await?; - - Ok(()) - } - - async fn _create_stream(&self, stream_name: &str, format: Vec) -> Result<(), AwsSdkError> { - // create ./schema empty file in the stream-name prefix - // this indicates that the stream has been created - // but doesn't have any content yet - let _resp = self - .client - .put_object() - .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME)) - .send() - .await?; - self._put_stream_meta(stream_name, format).await?; - // Prefix created on S3, now create the directory in - // the local storage as well - let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name)); - Ok(()) - } - - async fn _put_stream_meta(&self, stream_name: &str, body: Vec) -> Result<(), AwsSdkError> { - let _resp = self - .client - .put_object() - .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/{}", stream_name, METADATA_FILE_NAME)) - .body(body.into()) - .send() - .await?; - - Ok(()) - } - - async fn _delete_stream(&self, stream_name: &str) -> Result<(), AwsSdkError> { - let mut pages = self - .client - .list_objects_v2() - .bucket(&S3_CONFIG.s3_bucket_name) - .prefix(format!("{}/", stream_name)) - .into_paginator() - .send(); - - let mut delete_objects: Vec = vec![]; - while let Some(page) = pages.next().await { - let page = page?; - for obj in page.contents.unwrap() { - let obj_id = ObjectIdentifier::builder().set_key(obj.key).build(); - delete_objects.push(obj_id); - } - } - - let delete = Delete::builder().set_objects(Some(delete_objects)).build(); - - self.client - .delete_objects() - .bucket(&S3_CONFIG.s3_bucket_name) - .delete(delete) - .send() - .await?; - - Ok(()) - } - - async fn _put_alerts(&self, stream_name: &str, body: Vec) -> Result<(), AwsSdkError> { - let _resp = self - .client - .put_object() - .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/{}", stream_name, ALERT_FILE_NAME)) - .body(body.into()) - .send() - .await?; - - Ok(()) - } - - async fn _get_schema(&self, stream_name: &str) -> Result { - self._get(stream_name, SCHEMA_FILE_NAME).await - } - - async fn _alert_exists(&self, stream_name: &str) -> Result { - self._get(stream_name, ALERT_FILE_NAME).await - } - - async fn _get_stream_meta(&self, stream_name: &str) -> Result { - self._get(stream_name, METADATA_FILE_NAME).await - } - - async fn _get(&self, stream_name: &str, resource: &str) -> Result { - let resp = self - .client - .get_object() - .bucket(&S3_CONFIG.s3_bucket_name) - .key(format!("{}/{}", stream_name, resource)) - .send() - .await?; - let body = resp.body.collect().await; - let body_bytes = body.unwrap().into_bytes(); - Ok(body_bytes) - } - - #[allow(dead_code)] - async fn prefix_exists(&self, prefix: &str) -> Result { - // TODO check if head object is faster compared to list objects - let resp = self - .client - .list_objects_v2() - .bucket(&S3_CONFIG.s3_bucket_name) - .prefix(prefix) - .max_keys(1) - .send() - .await?; - - let result = resp.contents.is_some(); - - Ok(result) - } - - async fn _list_streams(&self) -> Result, AwsSdkError> { - let resp = self - .client - .list_objects_v2() - .bucket(&S3_CONFIG.s3_bucket_name) - .delimiter('/') - .send() - .await?; - - let common_prefixes = resp.common_prefixes().unwrap_or_default(); - - // return prefixes at the root level - let logstreams: Vec<_> = common_prefixes - .iter() - .filter_map(CommonPrefix::prefix) - .filter_map(|name| name.strip_suffix('/')) - .map(String::from) - .map(|name| LogStream { name }) - .collect(); - - Ok(logstreams) - } - - async fn _upload_file(&self, key: &str, path: &str) -> Result<(), AwsSdkError> { - let body = ByteStream::from_path(path).await.unwrap(); - let resp = self - .client - .put_object() - .bucket(&S3_CONFIG.s3_bucket_name) - .key(key) - .body(body) - .send() - .await?; - log::trace!("{:?}", resp); - - Ok(()) - } -} - -#[async_trait] -impl ObjectStorage for S3 { - fn new() -> Self { - Self::new() - } - - async fn check(&self) -> Result<(), ObjectStorageError> { - self.client - .head_bucket() - .bucket(&S3_CONFIG.s3_bucket_name) - .send() - .await - .map(|_| ()) - .map_err(|err| err.into()) - } - - async fn put_schema( - &self, - stream_name: String, - schema: &Schema, - ) -> Result<(), ObjectStorageError> { - self._put_schema(stream_name, serde_json::to_string(&schema)?) - .await?; - - Ok(()) - } - - async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { - let mut format = ObjectStoreFormat::default(); - format.set_id(CONFIG.parseable.username.clone()); - let access_object = AccessObject::new(CONFIG.parseable.username.clone()); - format.set_access(vec![access_object]); - - let body = serde_json::to_vec(&format)?; - self._create_stream(stream_name, body).await?; - - Ok(()) - } - - async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { - self._delete_stream(stream_name).await?; - - Ok(()) - } - - async fn put_alerts( - &self, - stream_name: &str, - alerts: &Alerts, - ) -> Result<(), ObjectStorageError> { - let body = serde_json::to_vec(alerts)?; - self._put_alerts(stream_name, body).await?; - - Ok(()) - } - - async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { - let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); - let parseable_metadata = self._get_stream_meta(stream_name).await?; - let mut parseable_metadata: Value = - serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); - - parseable_metadata["stats"] = stats; - - self._put_stream_meta(stream_name, parseable_metadata.to_string().into_bytes()) - .await?; - Ok(()) - } - - async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError> { - let body_bytes = self._get_schema(stream_name).await?; - let schema = serde_json::from_slice(&body_bytes).ok(); - Ok(schema) - } - - async fn get_alerts(&self, stream_name: &str) -> Result { - let res = self._alert_exists(stream_name).await; - - match res { - Ok(bytes) => Ok(serde_json::from_slice(&bytes).unwrap_or_default()), - Err(e) => match e { - AwsSdkError::NoSuchKey(_) => Ok(Alerts::default()), - e => Err(e.into()), - }, - } - } - - async fn get_stats(&self, stream_name: &str) -> Result { - let parseable_metadata = self._get_stream_meta(stream_name).await?; - let parseable_metadata: Value = - serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); - - let stats = &parseable_metadata["stats"]; - - let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); - - Ok(stats) - } - - async fn list_streams(&self) -> Result, ObjectStorageError> { - let streams = self._list_streams().await?; - - Ok(streams) - } - - async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError> { - self._upload_file(key, path).await?; - - Ok(()) - } - - fn query_table(&self, query: &Query) -> Result { - // Get all prefix paths and convert them into futures which yeilds ListingTableUrl - let prefixes = query - .get_prefixes() - .into_iter() - .map(|prefix| { - let path = format!("s3://{}/{}", &S3_CONFIG.s3_bucket_name, prefix); - ListingTableUrl::parse(path).unwrap() - }) - .collect(); - - let file_format = ParquetFormat::default().with_enable_pruning(true); - let listing_options = ListingOptions { - file_extension: ".data.parquet".to_string(), - format: Arc::new(file_format), - table_partition_cols: vec![], - collect_stat: true, - target_partitions: 1, - }; - - let config = ListingTableConfig::new_with_multi_paths(prefixes) - .with_listing_options(listing_options) - .with_schema(Arc::clone(&query.schema)); - - Ok(ListingTable::try_new(config)?) - } - - fn query_runtime_env(&self) -> Arc { - Arc::clone(&STORAGE_RUNTIME) - } -} - -impl From for ObjectStorageError { - fn from(error: AwsSdkError) -> Self { - ObjectStorageError::UnhandledError(Box::new(error)) - } -} - -impl From> for ObjectStorageError { - fn from(error: SdkError) -> Self { - match error { - SdkError::ServiceError { - err: - HeadBucketError { - kind: HeadBucketErrorKind::NotFound(_), - .. - }, - .. - } => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()), - SdkError::ServiceError { - err: - HeadBucketError { - kind: HeadBucketErrorKind::Unhandled(err), - .. - }, - .. - } => ObjectStorageError::AuthenticationError(err), - SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(Box::new(err)), - SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err), - err => ObjectStorageError::UnhandledError(Box::new(err)), - } - } -} - -impl From for ObjectStorageError { - fn from(error: serde_json::Error) -> Self { - ObjectStorageError::UnhandledError(Box::new(error)) - } -} diff --git a/server/src/storage.rs b/server/src/storage.rs index 4b8b2e280..7e7fedf5c 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,46 +16,125 @@ * */ -use crate::alerts::Alerts; -use crate::metadata::{LOCK_EXPECT, STREAM_INFO}; +use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::query::Query; -use crate::stats::Stats; + use crate::storage::file_link::{FileLink, FileTable}; use crate::utils; -use async_trait::async_trait; -use chrono::{NaiveDateTime, Timelike, Utc}; -use datafusion::arrow::datatypes::Schema; +use chrono::{Local, NaiveDateTime, Timelike, Utc}; use datafusion::arrow::error::ArrowError; -use datafusion::arrow::ipc::reader::StreamReader; -use datafusion::datasource::listing::ListingTable; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::errors::ParquetError; -use datafusion::parquet::file::properties::WriterProperties; use lazy_static::lazy_static; -use serde::Serialize; +use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fmt::Debug; -use std::fs::{self, File}; -use std::iter::Iterator; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use self::file_link::CacheState; +mod file_link; +mod localfs; +mod object_storage; +mod s3; + +pub use localfs::{FSConfig, LocalFS}; +pub use object_storage::{ObjectStorage, ObjectStorageProvider}; +pub use s3::{S3Config, S3}; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. pub const LOCAL_SYNC_INTERVAL: u64 = 60; -/// duration used to configure prefix in s3 and local disk structure +/// duration used to configure prefix in objectstore and local disk structure /// used for storage. Defaults to 1 min. pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60; +// max concurrent request allowed for datafusion object store +const MAX_OBJECT_STORE_REQUESTS: usize = 1000; + +// all the supported permissions +// const PERMISSIONS_READ: &str = "readonly"; +// const PERMISSIONS_WRITE: &str = "writeonly"; +// const PERMISSIONS_DELETE: &str = "delete"; +// const PERMISSIONS_READ_WRITE: &str = "readwrite"; +const PERMISSIONS_ALL: &str = "all"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ObjectStoreFormat { + pub version: String, + #[serde(rename = "objectstore-format")] + pub objectstore_format: String, + #[serde(rename = "created-at")] + pub created_at: String, + pub owner: Owner, + pub access: Access, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Owner { + pub id: String, + pub group: String, +} + +impl Owner { + pub fn new(id: String, group: String) -> Self { + Self { id, group } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Access { + pub objects: Vec, +} + +impl Access { + pub fn new(objects: Vec) -> Self { + Self { objects } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AccessObject { + pub id: String, + pub group: String, + pub permissions: Vec, +} + +impl AccessObject { + pub fn new(id: String) -> Self { + Self { + id: id.clone(), + group: id, + permissions: vec![PERMISSIONS_ALL.to_string()], + } + } +} + +impl Default for ObjectStoreFormat { + fn default() -> Self { + Self { + version: "v1".to_string(), + objectstore_format: "v1".to_string(), + created_at: Local::now().to_rfc3339(), + owner: Owner::new("".to_string(), "".to_string()), + access: Access::new(vec![]), + } + } +} + +impl ObjectStoreFormat { + fn set_id(&mut self, id: String) { + self.owner.id.clone_from(&id); + self.owner.group = id; + } + fn set_access(&mut self, access: Vec) { + self.access.objects = access; + } +} + lazy_static! { pub static ref CACHED_FILES: Mutex> = Mutex::new(FileTable::new()); + pub static ref STORAGE_RUNTIME: Arc = CONFIG.storage().get_datafusion_runtime(); } impl CACHED_FILES { @@ -69,144 +148,6 @@ impl CACHED_FILES { } } -#[async_trait] -pub trait ObjectStorage: Sync + 'static { - fn new() -> Self; - async fn check(&self) -> Result<(), ObjectStorageError>; - async fn put_schema( - &self, - stream_name: String, - schema: &Schema, - ) -> Result<(), ObjectStorageError>; - async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; - async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; - - async fn put_alerts( - &self, - stream_name: &str, - alerts: &Alerts, - ) -> Result<(), ObjectStorageError>; - async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError>; - async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError>; - async fn get_alerts(&self, stream_name: &str) -> Result; - async fn get_stats(&self, stream_name: &str) -> Result; - async fn list_streams(&self) -> Result, ObjectStorageError>; - async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError>; - fn query_table(&self, query: &Query) -> Result; - fn query_runtime_env(&self) -> Arc; - - async fn s3_sync(&self) -> Result<(), MoveDataError> { - if !Path::new(&CONFIG.parseable.local_disk_path).exists() { - return Ok(()); - } - - let streams = STREAM_INFO.list_streams(); - - let mut stream_stats = HashMap::new(); - - for stream in &streams { - // get dir - let dir = StorageDir::new(stream); - // walk dir, find all .arrows files and convert to parquet - - let mut arrow_files = dir.arrow_files(); - // Do not include file which is being written to - let hot_file = dir.path_by_current_time(); - let hot_filename = hot_file.file_name().expect("is a not none filename"); - - arrow_files.retain(|file| { - !file - .file_name() - .expect("is a not none filename") - .eq(hot_filename) - }); - - for file in arrow_files { - let arrow_file = File::open(&file).map_err(|_| MoveDataError::Open)?; - let reader = StreamReader::try_new(arrow_file, None)?; - let schema = reader.schema(); - let records = reader.filter_map(|record| match record { - Ok(record) => Some(record), - Err(e) => { - log::warn!("warning from arrow stream {:?}", e); - None - } - }); - - let mut parquet_path = file.clone(); - parquet_path.set_extension("parquet"); - let mut parquet_table = CACHED_FILES.lock().unwrap(); - let parquet_file = - fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - parquet_table.upsert(&parquet_path); - - let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; - - for ref record in records { - writer.write(record)?; - } - - writer.close()?; - - fs::remove_file(file).map_err(|_| MoveDataError::Delete)?; - } - - for file in dir.parquet_files() { - let metadata = CACHED_FILES.lock().unwrap().get_mut(&file).metadata; - if metadata != CacheState::Idle { - continue; - } - - let filename = file - .file_name() - .expect("only parquet files are returned by iterator") - .to_str() - .expect("filename is valid string"); - let file_suffix = str::replacen(filename, ".", "/", 3); - let s3_path = format!("{}/{}", stream, file_suffix); - CACHED_FILES - .lock() - .unwrap() - .get_mut(&file) - .set_metadata(CacheState::Uploading); - let _put_parquet_file = self.upload_file(&s3_path, file.to_str().unwrap()).await?; - CACHED_FILES - .lock() - .unwrap() - .get_mut(&file) - .set_metadata(CacheState::Uploaded); - - stream_stats - .entry(stream) - .and_modify(|size| *size += file.metadata().map_or(0, |meta| meta.len())) - .or_insert_with(|| file.metadata().map_or(0, |meta| meta.len())); - - CACHED_FILES.lock().unwrap().remove(&file); - } - } - - for (stream, compressed_size) in stream_stats { - let stats = STREAM_INFO - .read() - .expect(LOCK_EXPECT) - .get(stream) - .map(|metadata| { - metadata.stats.add_storage_size(compressed_size); - Stats::from(&metadata.stats) - }); - - if let Some(stats) = stats { - if let Err(e) = self.put_stats(stream, &stats).await { - log::warn!("Error updating stats to s3 due to error [{}]", e); - } - } - } - - Ok(()) - } -} - #[derive(Serialize)] pub struct LogStream { pub name: String, @@ -266,94 +207,6 @@ impl StorageDir { } } -pub mod file_link { - use std::{ - collections::HashMap, - path::{Path, PathBuf}, - }; - - pub trait Link { - fn links(&self) -> usize; - fn increase_link_count(&mut self) -> usize; - fn decreate_link_count(&mut self) -> usize; - } - - #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] - pub enum CacheState { - #[default] - Idle, - Uploading, - Uploaded, - } - - #[derive(Debug)] - pub struct FileLink { - link: usize, - pub metadata: CacheState, - } - - impl Default for FileLink { - fn default() -> Self { - Self { - link: 1, - metadata: CacheState::Idle, - } - } - } - - impl FileLink { - pub fn set_metadata(&mut self, state: CacheState) { - self.metadata = state - } - } - - impl Link for FileLink { - fn links(&self) -> usize { - self.link - } - - fn increase_link_count(&mut self) -> usize { - self.link.saturating_add(1) - } - - fn decreate_link_count(&mut self) -> usize { - self.link.saturating_sub(1) - } - } - - pub struct FileTable { - inner: HashMap, - } - - impl FileTable { - pub fn new() -> Self { - Self { - inner: HashMap::default(), - } - } - - pub fn upsert(&mut self, path: &Path) { - if let Some(entry) = self.inner.get_mut(path) { - entry.increase_link_count(); - } else { - self.inner.insert(path.to_path_buf(), L::default()); - } - } - - pub fn remove(&mut self, path: &Path) { - let Some(link_count) = self.inner.get_mut(path).map(|entry| entry.decreate_link_count()) else { return }; - if link_count == 0 { - let _ = std::fs::remove_file(path); - self.inner.remove(path); - } - } - - pub fn get_mut(&mut self, path: &Path) -> &mut L { - self.inner.get_mut(path).unwrap() - } - } -} - #[derive(Debug, thiserror::Error)] pub enum MoveDataError { #[error("Unable to Open file after moving")] @@ -363,7 +216,7 @@ pub enum MoveDataError { #[error("Could not generate parquet file")] Parquet(#[from] ParquetError), #[error("Object Storage Error {0}")] - ObjectStorag(#[from] ObjectStorageError), + ObjectStorage(#[from] ObjectStorageError), #[error("Could not generate parquet file")] Create, #[error("Could not delete temp arrow file")] @@ -372,16 +225,25 @@ pub enum MoveDataError { #[derive(Debug, thiserror::Error)] pub enum ObjectStorageError { - #[error("Bucket {0} not found")] - NoSuchBucket(String), + // no such key inside the object storage + #[error("{0} not found")] + NoSuchKey(String), + + // Could not connect to object storage #[error("Connection Error: {0}")] ConnectionError(Box), + + // IO Error when reading a file or listing path #[error("IO Error: {0}")] IoError(#[from] std::io::Error), + + // Datafusion error during a query #[error("DataFusion Error: {0}")] DataFusionError(#[from] datafusion::error::DataFusionError), + #[error("Unhandled Error: {0}")] UnhandledError(Box), + #[error("Authentication Error: {0}")] AuthenticationError(Box), } diff --git a/server/src/storage/file_link.rs b/server/src/storage/file_link.rs new file mode 100644 index 000000000..a084a7722 --- /dev/null +++ b/server/src/storage/file_link.rs @@ -0,0 +1,103 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; + +pub trait Link { + fn links(&self) -> usize; + fn increase_link_count(&mut self) -> usize; + fn decreate_link_count(&mut self) -> usize; +} + +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub enum CacheState { + #[default] + Idle, + Uploading, + Uploaded, +} + +#[derive(Debug)] +pub struct FileLink { + link: usize, + pub metadata: CacheState, +} + +impl Default for FileLink { + fn default() -> Self { + Self { + link: 1, + metadata: CacheState::Idle, + } + } +} + +impl FileLink { + pub fn set_metadata(&mut self, state: CacheState) { + self.metadata = state + } +} + +impl Link for FileLink { + fn links(&self) -> usize { + self.link + } + + fn increase_link_count(&mut self) -> usize { + self.link.saturating_add(1) + } + + fn decreate_link_count(&mut self) -> usize { + self.link.saturating_sub(1) + } +} + +pub struct FileTable { + inner: HashMap, +} + +impl FileTable { + pub fn new() -> Self { + Self { + inner: HashMap::default(), + } + } + + pub fn upsert(&mut self, path: &Path) { + if let Some(entry) = self.inner.get_mut(path) { + entry.increase_link_count(); + } else { + self.inner.insert(path.to_path_buf(), L::default()); + } + } + + pub fn remove(&mut self, path: &Path) { + let Some(link_count) = self.inner.get_mut(path).map(|entry| entry.decreate_link_count()) else { return }; + if link_count == 0 { + let _ = std::fs::remove_file(path); + self.inner.remove(path); + } + } + + pub fn get_mut(&mut self, path: &Path) -> &mut L { + self.inner.get_mut(path).unwrap() + } +} diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs new file mode 100644 index 000000000..18852cc2d --- /dev/null +++ b/server/src/storage/localfs.rs @@ -0,0 +1,193 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use datafusion::{ + datasource::{ + file_format::parquet::ParquetFormat, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + }, + execution::runtime_env::{RuntimeConfig, RuntimeEnv}, +}; +use fs_extra::file::{move_file, CopyOptions}; +use futures::StreamExt; +use relative_path::RelativePath; +use tokio::fs; +use tokio_stream::wrappers::ReadDirStream; + +use crate::query::Query; + +use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; + +#[derive(Debug, Clone, clap::Args)] +#[command( + name = "Local filesystem config", + about = "configuration for using local filesystem for storage" +)] +pub struct FSConfig { + #[arg(long, env = "P_FS_PATH", value_name = "path")] + root: PathBuf, +} + +impl ObjectStorageProvider for FSConfig { + fn get_datafusion_runtime(&self) -> Arc { + let config = RuntimeConfig::new(); + let runtime = RuntimeEnv::new(config).unwrap(); + Arc::new(runtime) + } + + fn get_object_store(&self) -> Arc { + Arc::new(LocalFS::new(self.root.clone())) + } + + fn get_endpoint(&self) -> String { + self.root.to_str().unwrap().to_string() + } +} + +pub struct LocalFS { + root: PathBuf, +} + +impl LocalFS { + pub fn new(root: PathBuf) -> Self { + Self { root } + } + + pub fn path_in_root(&self, path: &RelativePath) -> PathBuf { + path.to_path(&self.root) + } +} + +#[async_trait] +impl ObjectStorage for LocalFS { + async fn get_object(&self, path: &RelativePath) -> Result { + let file_path = self.path_in_root(path); + match fs::read(file_path).await { + Ok(x) => Ok(x.into()), + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => { + Err(ObjectStorageError::NoSuchKey(path.to_string())) + } + _ => Err(ObjectStorageError::UnhandledError(Box::new(e))), + }, + } + } + + async fn put_object( + &self, + path: &RelativePath, + resource: Bytes, + ) -> Result<(), ObjectStorageError> { + let path = self.path_in_root(path); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + Ok(fs::write(path, resource).await?) + } + + async fn check(&self) -> Result<(), ObjectStorageError> { + Ok(fs::create_dir_all(&self.root).await?) + } + + async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + let path = self.root.join(stream_name); + Ok(fs::remove_dir_all(path).await?) + } + async fn list_streams(&self) -> Result, ObjectStorageError> { + let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); + let directories = directories + .filter_map(|res| async { + let entry = res.ok()?; + if entry.file_type().await.ok()?.is_dir() { + Some(LogStream { + name: entry + .path() + .file_name() + .expect("valid path") + .to_str() + .expect("valid unicode") + .to_string(), + }) + } else { + None + } + }) + .collect::>() + .await; + + Ok(directories) + } + + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { + let op = CopyOptions { + overwrite: true, + skip_exist: true, + ..CopyOptions::default() + }; + let to_path = self.root.join(key); + if let Some(path) = to_path.parent() { + fs::create_dir_all(path).await? + } + let _ = move_file(path, to_path, &op)?; + + Ok(()) + } + + fn query_table(&self, query: &Query) -> Result, ObjectStorageError> { + let prefixes: Vec = query + .get_prefixes() + .into_iter() + .filter_map(|prefix| { + let path = self.root.join(prefix); + ListingTableUrl::parse(path.to_str().unwrap()).ok() + }) + .collect(); + + if prefixes.is_empty() { + return Ok(None); + } + + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions { + file_extension: ".parquet".to_string(), + format: Arc::new(file_format), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 1, + }; + + let config = ListingTableConfig::new_with_multi_paths(prefixes) + .with_listing_options(listing_options) + .with_schema(Arc::clone(&query.schema)); + + Ok(Some(ListingTable::try_new(config)?)) + } +} + +impl From for ObjectStorageError { + fn from(e: fs_extra::error::Error) -> Self { + ObjectStorageError::UnhandledError(Box::new(e)) + } +} diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs new file mode 100644 index 000000000..2f852210f --- /dev/null +++ b/server/src/storage/object_storage.rs @@ -0,0 +1,275 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use super::{ + file_link::CacheState, AccessObject, LogStream, MoveDataError, ObjectStorageError, + ObjectStoreFormat, StorageDir, CACHED_FILES, +}; +use crate::{alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats}; + +use arrow_schema::Schema; +use async_trait::async_trait; +use bytes::Bytes; +use datafusion::{ + arrow::ipc::reader::StreamReader, + datasource::listing::ListingTable, + execution::runtime_env::RuntimeEnv, + parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, +}; +use relative_path::RelativePath; +use relative_path::RelativePathBuf; +use serde::Serialize; +use serde_json::Value; + +use std::{ + collections::HashMap, + fs::{self, File}, + path::Path, + sync::Arc, +}; + +// metadata file names in a Stream prefix +const METADATA_FILE_NAME: &str = ".metadata.json"; +const SCHEMA_FILE_NAME: &str = ".schema"; +const ALERT_FILE_NAME: &str = ".alert.json"; + +pub trait ObjectStorageProvider { + fn get_datafusion_runtime(&self) -> Arc; + fn get_object_store(&self) -> Arc; + fn get_endpoint(&self) -> String; +} + +#[async_trait] +pub trait ObjectStorage: Sync + 'static { + async fn get_object(&self, path: &RelativePath) -> Result; + async fn put_object( + &self, + path: &RelativePath, + resource: Bytes, + ) -> Result<(), ObjectStorageError>; + async fn check(&self) -> Result<(), ObjectStorageError>; + async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; + async fn list_streams(&self) -> Result, ObjectStorageError>; + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; + fn query_table(&self, query: &Query) -> Result, ObjectStorageError>; + + async fn put_schema( + &self, + stream_name: &str, + schema: &Schema, + ) -> Result<(), ObjectStorageError> { + self.put_object(&schema_path(stream_name), to_bytes(schema)) + .await?; + + Ok(()) + } + + async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + let mut format = ObjectStoreFormat::default(); + format.set_id(CONFIG.parseable.username.clone()); + let access_object = AccessObject::new(CONFIG.parseable.username.clone()); + format.set_access(vec![access_object]); + + let format_json = to_bytes(&format); + + self.put_object(&schema_path(stream_name), "".into()) + .await?; + self.put_object(&metadata_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + + async fn put_alerts( + &self, + stream_name: &str, + alerts: &Alerts, + ) -> Result<(), ObjectStorageError> { + self.put_object(&alert_json_path(stream_name), to_bytes(alerts)) + .await + } + + async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { + let path = metadata_json_path(stream_name); + let parseable_metadata = self.get_object(&path).await?; + let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); + let mut parseable_metadata: serde_json::Value = + serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + + parseable_metadata["stats"] = stats; + + self.put_object(&path, to_bytes(&parseable_metadata)).await + } + + async fn get_schema(&self, stream_name: &str) -> Result, ObjectStorageError> { + let schema = self.get_object(&schema_path(stream_name)).await?; + let schema = serde_json::from_slice(&schema).ok(); + Ok(schema) + } + + async fn get_alerts(&self, stream_name: &str) -> Result { + match self.get_object(&alert_json_path(stream_name)).await { + Ok(alerts) => Ok(serde_json::from_slice(&alerts).unwrap_or_default()), + Err(e) => match e { + ObjectStorageError::NoSuchKey(_) => Ok(Alerts::default()), + e => Err(e), + }, + } + } + + async fn get_stats(&self, stream_name: &str) -> Result { + let parseable_metadata = self.get_object(&metadata_json_path(stream_name)).await?; + let parseable_metadata: Value = + serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json"); + + let stats = &parseable_metadata["stats"]; + + let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); + + Ok(stats) + } + + async fn sync(&self) -> Result<(), MoveDataError> { + if !Path::new(&CONFIG.parseable.local_disk_path).exists() { + return Ok(()); + } + + let streams = STREAM_INFO.list_streams(); + + let mut stream_stats = HashMap::new(); + + for stream in &streams { + // get dir + let dir = StorageDir::new(stream); + // walk dir, find all .arrows files and convert to parquet + + let mut arrow_files = dir.arrow_files(); + // Do not include file which is being written to + let hot_file = dir.path_by_current_time(); + let hot_filename = hot_file.file_name().expect("is a not none filename"); + + arrow_files.retain(|file| { + !file + .file_name() + .expect("is a not none filename") + .eq(hot_filename) + }); + + for file in arrow_files { + let arrow_file = File::open(&file).map_err(|_| MoveDataError::Open)?; + let reader = StreamReader::try_new(arrow_file, None)?; + let schema = reader.schema(); + let records = reader.filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("warning from arrow stream {:?}", e); + None + } + }); + + let mut parquet_path = file.clone(); + parquet_path.set_extension("parquet"); + let mut parquet_table = CACHED_FILES.lock().unwrap(); + let parquet_file = + fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + parquet_table.upsert(&parquet_path); + + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; + + for ref record in records { + writer.write(record)?; + } + + writer.close()?; + + fs::remove_file(file).map_err(|_| MoveDataError::Delete)?; + } + + for file in dir.parquet_files() { + let metadata = CACHED_FILES.lock().unwrap().get_mut(&file).metadata; + if metadata != CacheState::Idle { + continue; + } + + let filename = file + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + let file_suffix = str::replacen(filename, ".", "/", 3); + let objectstore_path = format!("{}/{}", stream, file_suffix); + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .set_metadata(CacheState::Uploading); + let _put_parquet_file = self.upload_file(&objectstore_path, &file).await?; + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .set_metadata(CacheState::Uploaded); + + stream_stats + .entry(stream) + .and_modify(|size| *size += file.metadata().map_or(0, |meta| meta.len())) + .or_insert_with(|| file.metadata().map_or(0, |meta| meta.len())); + + CACHED_FILES.lock().unwrap().remove(&file); + } + } + + for (stream, compressed_size) in stream_stats { + let stats = STREAM_INFO.read().unwrap().get(stream).map(|metadata| { + metadata.stats.add_storage_size(compressed_size); + Stats::from(&metadata.stats) + }); + + if let Some(stats) = stats { + if let Err(e) = self.put_stats(stream, &stats).await { + log::warn!("Error updating stats to objectstore due to error [{}]", e); + } + } + } + + Ok(()) + } +} + +#[inline(always)] +fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { + serde_json::to_vec(any) + .map(|any| any.into()) + .expect("serialize cannot fail") +} + +#[inline(always)] +fn schema_path(stream_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]) +} + +#[inline(always)] +fn metadata_json_path(stream_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([stream_name, METADATA_FILE_NAME]) +} + +#[inline(always)] +fn alert_json_path(stream_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([stream_name, ALERT_FILE_NAME]) +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs new file mode 100644 index 000000000..7d621a005 --- /dev/null +++ b/server/src/storage/s3.rs @@ -0,0 +1,369 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use async_trait::async_trait; +use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind}; +use aws_sdk_s3::model::{CommonPrefix, Delete, ObjectIdentifier}; +use aws_sdk_s3::types::{ByteStream, SdkError}; +use aws_sdk_s3::Error as AwsSdkError; +use aws_sdk_s3::RetryConfig; +use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; +use aws_smithy_async::rt::sleep::default_async_sleep; +use bytes::Bytes; +use clap::builder::ArgPredicate; + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::object_store::ObjectStoreRegistry; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use futures::StreamExt; +use http::Uri; +use object_store::aws::AmazonS3Builder; +use object_store::limit::LimitStore; +use relative_path::RelativePath; + +use std::iter::Iterator; +use std::path::Path; +use std::sync::Arc; + +use crate::query::Query; + +use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; + +use super::ObjectStorageProvider; + +// Default object storage currently is DO Spaces bucket +// Any user who starts the Parseable server with default configuration +// will point to this bucket and will see any data present on this bucket +const DEFAULT_S3_URL: &str = "https://minio.parseable.io:9000"; +const DEFAULT_S3_REGION: &str = "us-east-1"; +const DEFAULT_S3_BUCKET: &str = "parseable"; +const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin"; +const DEFAULT_S3_SECRET_KEY: &str = "minioadmin"; + +#[derive(Debug, Clone, clap::Args)] +#[command(name = "S3 config", about = "configuration for AWS S3 SDK")] +pub struct S3Config { + /// The endpoint to AWS S3 or compatible object storage platform + #[arg( + long, + env = "P_S3_URL", + value_name = "url", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_URL) + )] + pub s3_endpoint_url: String, + + /// The access key for AWS S3 or compatible object storage platform + #[arg( + long, + env = "P_S3_ACCESS_KEY", + value_name = "access-key", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_ACCESS_KEY) + )] + pub s3_access_key_id: String, + + /// The secret key for AWS S3 or compatible object storage platform + #[arg( + long, + env = "P_S3_SECRET_KEY", + value_name = "secret-key", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_SECRET_KEY) + )] + pub s3_secret_key: String, + + /// The region for AWS S3 or compatible object storage platform + #[arg( + long, + env = "P_S3_REGION", + value_name = "region", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_REGION) + )] + pub s3_region: String, + + /// The AWS S3 or compatible object storage bucket to be used for storage + #[arg( + long, + env = "P_S3_BUCKET", + value_name = "bucket-name", + default_value_if("demo", ArgPredicate::IsPresent, DEFAULT_S3_BUCKET) + )] + pub s3_bucket_name: String, +} + +impl ObjectStorageProvider for S3Config { + fn get_datafusion_runtime(&self) -> Arc { + let s3 = AmazonS3Builder::new() + .with_region(&self.s3_region) + .with_endpoint(&self.s3_endpoint_url) + .with_bucket_name(&self.s3_bucket_name) + .with_access_key_id(&self.s3_access_key_id) + .with_secret_access_key(&self.s3_secret_key) + // allow http for local instances + .with_allow_http(true) + .build() + .unwrap(); + + // limit objectstore to a concurrent request limit + let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); + + let object_store_registry = ObjectStoreRegistry::new(); + object_store_registry.register_store("s3", &self.s3_bucket_name, Arc::new(s3)); + + let config = + RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); + + let runtime = RuntimeEnv::new(config).unwrap(); + + Arc::new(runtime) + } + + fn get_object_store(&self) -> Arc { + let uri = self.s3_endpoint_url.parse::().unwrap(); + let endpoint = Endpoint::immutable(uri); + let region = Region::new(self.s3_region.clone()); + let creds = Credentials::new(&self.s3_access_key_id, &self.s3_secret_key, None, None, ""); + + let config = aws_sdk_s3::Config::builder() + .region(region) + .endpoint_resolver(endpoint) + .credentials_provider(creds) + .retry_config(RetryConfig::standard().with_max_attempts(5)) + .sleep_impl(default_async_sleep().expect("sleep impl is provided for tokio rt")) + .build(); + + let client = Client::from_conf(config); + + Arc::new(S3 { + client, + bucket: self.s3_bucket_name.clone(), + }) + } + + fn get_endpoint(&self) -> String { + format!("{}/{}", self.s3_endpoint_url, self.s3_bucket_name) + } +} + +pub struct S3 { + client: aws_sdk_s3::Client, + bucket: String, +} + +impl S3 { + async fn _get_object(&self, path: &RelativePath) -> Result { + let resp = self + .client + .get_object() + .bucket(&self.bucket) + .key(path.as_str()) + .send() + .await?; + let body = resp.body.collect().await; + let body_bytes = body.unwrap().into_bytes(); + Ok(body_bytes) + } + + async fn _delete_stream(&self, stream_name: &str) -> Result<(), AwsSdkError> { + let mut pages = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(format!("{}/", stream_name)) + .into_paginator() + .send(); + + let mut delete_objects: Vec = vec![]; + while let Some(page) = pages.next().await { + let page = page?; + for obj in page.contents.unwrap() { + let obj_id = ObjectIdentifier::builder().set_key(obj.key).build(); + delete_objects.push(obj_id); + } + } + + let delete = Delete::builder().set_objects(Some(delete_objects)).build(); + + self.client + .delete_objects() + .bucket(&self.bucket) + .delete(delete) + .send() + .await?; + + Ok(()) + } + + async fn _list_streams(&self) -> Result, AwsSdkError> { + let resp = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .delimiter('/') + .send() + .await?; + + let common_prefixes = resp.common_prefixes().unwrap_or_default(); + + // return prefixes at the root level + let logstreams: Vec<_> = common_prefixes + .iter() + .filter_map(CommonPrefix::prefix) + .filter_map(|name| name.strip_suffix('/')) + .map(String::from) + .map(|name| LogStream { name }) + .collect(); + + Ok(logstreams) + } + + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), AwsSdkError> { + let body = ByteStream::from_path(path).await.unwrap(); + let resp = self + .client + .put_object() + .bucket(&self.bucket) + .key(key) + .body(body) + .send() + .await?; + log::trace!("{:?}", resp); + + Ok(()) + } +} + +#[async_trait] +impl ObjectStorage for S3 { + async fn get_object(&self, path: &RelativePath) -> Result { + Ok(self._get_object(path).await?) + } + + async fn put_object( + &self, + path: &RelativePath, + resource: Bytes, + ) -> Result<(), ObjectStorageError> { + let _resp = self + .client + .put_object() + .bucket(&self.bucket) + .key(path.as_str()) + .body(resource.into()) + .send() + .await + .map_err(|err| ObjectStorageError::ConnectionError(Box::new(err)))?; + + Ok(()) + } + + async fn check(&self) -> Result<(), ObjectStorageError> { + self.client + .head_bucket() + .bucket(&self.bucket) + .send() + .await + .map(|_| ()) + .map_err(|err| err.into()) + } + + async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + self._delete_stream(stream_name).await?; + + Ok(()) + } + + async fn list_streams(&self) -> Result, ObjectStorageError> { + let streams = self._list_streams().await?; + + Ok(streams) + } + + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { + self._upload_file(key, path).await?; + + Ok(()) + } + + fn query_table(&self, query: &Query) -> Result, ObjectStorageError> { + // Get all prefix paths and convert them into futures which yeilds ListingTableUrl + let prefixes: Vec = query + .get_prefixes() + .into_iter() + .map(|prefix| { + let path = format!("s3://{}/{}", &self.bucket, prefix); + ListingTableUrl::parse(path).unwrap() + }) + .collect(); + + if prefixes.is_empty() { + return Ok(None); + } + + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions { + file_extension: ".parquet".to_string(), + format: Arc::new(file_format), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 1, + }; + + let config = ListingTableConfig::new_with_multi_paths(prefixes) + .with_listing_options(listing_options) + .with_schema(Arc::clone(&query.schema)); + + Ok(Some(ListingTable::try_new(config)?)) + } +} + +impl From for ObjectStorageError { + fn from(error: AwsSdkError) -> Self { + match error { + AwsSdkError::NotFound(_) | AwsSdkError::NoSuchKey(_) => { + ObjectStorageError::NoSuchKey("".to_string()) + } + e => ObjectStorageError::UnhandledError(Box::new(e)), + } + } +} + +impl From> for ObjectStorageError { + fn from(error: SdkError) -> Self { + match error { + SdkError::ServiceError { + err: + HeadBucketError { + kind: HeadBucketErrorKind::Unhandled(err), + .. + }, + .. + } => ObjectStorageError::AuthenticationError(err), + SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(Box::new(err)), + SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err), + err => ObjectStorageError::UnhandledError(Box::new(err)), + } + } +} + +impl From for ObjectStorageError { + fn from(error: serde_json::Error) -> Self { + ObjectStorageError::UnhandledError(Box::new(error)) + } +}