From a578b114c41b01361c4d4389dd648c9a1bd75283 Mon Sep 17 00:00:00 2001 From: Piyush Singariya Date: Mon, 30 Sep 2024 00:46:45 +0530 Subject: [PATCH 1/2] WIP: adding azure blob as object store --- server/Cargo.toml | 2 +- server/src/metrics/storage.rs | 39 +++++++++++++ server/src/option.rs | 28 +++++++++- server/src/storage.rs | 2 + server/src/storage/azure_blob.rs | 95 ++++++++++++++++++++++++++++++++ server/src/storage/s3.rs | 24 +++++--- 6 files changed, 179 insertions(+), 11 deletions(-) create mode 100644 server/src/storage/azure_blob.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index cf20002cd..f2f4e597a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ arrow-json = "52.1.0" arrow-ipc = { version = "52.1.0", features = ["zstd"] } arrow-select = "52.1.0" datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" } -object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up +object_store = { version = "0.10.2", features = ["cloud", "aws", "azure"] } # cannot update object_store as datafusion has not caught up parquet = "52.1.0" arrow-flight = { version = "52.1.0", features = [ "tls" ] } tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } diff --git a/server/src/metrics/storage.rs b/server/src/metrics/storage.rs index abb06aa94..a91c431cb 100644 --- a/server/src/metrics/storage.rs +++ b/server/src/metrics/storage.rs @@ -86,3 +86,42 @@ pub mod s3 { } } } + +pub mod azureblob { + use crate::{metrics::METRICS_NAMESPACE, storage::AzureBlobConfig}; + use once_cell::sync::Lazy; + use prometheus::{HistogramOpts, HistogramVec}; + + use super::StorageMetrics; + + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("azr_blob_response_time", "AzureBlob Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); + + pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("query_azr_blob_response_time", "AzureBlob Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); + + impl StorageMetrics for AzureBlobConfig { + fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + handler + .registry + .register(Box::new(REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + handler + .registry + .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + } + } +} diff --git a/server/src/option.rs b/server/src/option.rs index 00a699752..6b4876c39 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -18,7 +18,7 @@ use crate::cli::Cli; use crate::storage::object_storage::parseable_json_path; -use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; +use crate::storage::{AzureBlobConfig, FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; use bytes::Bytes; use clap::error::ErrorKind; use clap::{command, Args, Command, FromArgMatches}; @@ -105,6 +105,22 @@ Cloud Native, log analytics platform for modern applications."#, storage_name: "s3", } } + Some(("azure-blob", m)) => { + let cli = match Cli::from_arg_matches(m) { + Ok(cli) => cli, + Err(err) => err.exit(), + }; + let storage = match AzureBlobConfig::from_arg_matches(m) { + Ok(storage) => storage, + Err(err) => err.exit(), + }; + + Config { + parseable: cli, + storage: Arc::new(storage), + storage_name: "azure_blob", + } + } _ => unreachable!(), } } @@ -163,11 +179,14 @@ Cloud Native, log analytics platform for modern applications."#, // returns the string representation of the storage mode // drive --> Local drive // s3 --> S3 bucket + // azure_blob --> Azure Blob Storage pub fn get_storage_mode_string(&self) -> &str { if self.storage_name == "drive" { return "Local drive"; + } else if self.storage_name == "s3" { + return "S3 bucket" } - "S3 bucket" + return "Azure Blob Storage" } pub fn get_server_mode_string(&self) -> &str { @@ -193,6 +212,9 @@ fn create_parseable_cli_command() -> Command { let s3 = Cli::create_cli_command_with_clap("s3-store"); let s3 = ::augment_args_for_update(s3); + let azureblob = Cli::create_cli_command_with_clap("azure-blob"); + let azureblob = ::augment_args_for_update(azureblob); + command!() .name("Parseable") .bin_name("parseable") @@ -207,7 +229,7 @@ Join the community at https://logg.ing/community. "#, ) .subcommand_required(true) - .subcommands([local, s3]) + .subcommands([local, s3, azureblob]) } #[derive(Debug, Default, Eq, PartialEq)] diff --git a/server/src/storage.rs b/server/src/storage.rs index 040c7ae26..8c8f7fd1d 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -29,6 +29,7 @@ mod metrics_layer; pub(crate) mod object_storage; pub mod retention; mod s3; +mod azure_blob; pub mod staging; mod store_metadata; @@ -37,6 +38,7 @@ pub use self::staging::StorageDir; pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; +pub use azure_blob::AzureBlobConfig; pub use store_metadata::{ put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, }; diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs new file mode 100644 index 000000000..90ca0f934 --- /dev/null +++ b/server/src/storage/azure_blob.rs @@ -0,0 +1,95 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ +use object_store::azure::{MicrosoftAzureBuilder, MicrosoftAzure}; +use super::s3::ObjStoreClient; +use super::ObjectStorageProvider; +use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::datasource::object_store::{ + DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, +}; +use crate::metrics::storage::StorageMetrics; +use std::sync::Arc; +use object_store::limit::LimitStore; +use super::metrics_layer::MetricLayer; +use object_store::path::Path as StorePath; + +#[derive(Debug, Clone, clap::Args)] +#[command( + name = "Azure config", + about = "Start Parseable with Azure Blob storage", + help_template = "\ +{about-section} +{all-args} +" +)] +pub struct AzureBlobConfig { + // The Azure Storage Account ID + #[arg(long, env = "P_AZR_ACCOUNT", value_name = "account", required = true)] + pub account: String, + + /// The Azure Storage Access key + #[arg(long, env = "P_AZR_ACCESS_KEY", value_name = "access-key", required = true)] + pub access_key: String, + + /// The container name to be used for storage + #[arg(long, env = "P_AZR_CONTAINER", value_name = "container", required = true)] + pub container: String, +} + +impl AzureBlobConfig { + fn get_default_interface(&self) -> MicrosoftAzure { + let result = MicrosoftAzureBuilder::new() + .with_account(self.account.clone()) + .with_access_key(self.access_key.clone()) + .with_container_name(self.container.clone()) + .build().expect("Failed to build Microsoft Azure interface"); + + return result + } +} + +impl ObjectStorageProvider for AzureBlobConfig { + fn get_datafusion_runtime(&self) -> RuntimeConfig { + let azure = self.get_default_interface(); + // limit objectstore to a concurrent request limit + let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); + let azure = MetricLayer::new(azure); + + let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); + let url = ObjectStoreUrl::parse(format!("az://{}", &self.container)).unwrap(); + object_store_registry.register_store(url.as_ref(), Arc::new(azure)); + + RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) + } + + fn get_object_store(&self) -> Arc { + let azure = self.get_default_interface(); + + // limit objectstore to a concurrent request limit + let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); + Arc::new(ObjStoreClient::new(azure, self.container.clone(), StorePath::from(""))) + } + + fn get_endpoint(&self) -> String { + return String::from("to be implmented") + } + + fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + self.register_metrics(handler) + } +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d359a11a3..ad156d933 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -25,7 +25,7 @@ use datafusion::datasource::object_store::{ use datafusion::execution::runtime_env::RuntimeConfig; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; -use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; +use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey, Checksum}; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; use object_store::{ClientOptions, ObjectStore, PutPayload}; @@ -200,7 +200,7 @@ impl ObjectStorageProvider for S3Config { // limit objectstore to a concurrent request limit let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); - Arc::new(S3 { + Arc::new(ObjStoreClient { client: s3, bucket: self.bucket_name.clone(), root: StorePath::from(""), @@ -216,17 +216,27 @@ impl ObjectStorageProvider for S3Config { } } -fn to_object_store_path(path: &RelativePath) -> StorePath { +pub fn to_object_store_path(path: &RelativePath) -> StorePath { StorePath::from(path.as_str()) } -pub struct S3 { - client: LimitStore, +// ObjStoreClient is generic client to enable interactions with different cloudprovider's +// object store such as S3 and Azure Blob +pub struct ObjStoreClient { + client: LimitStore, bucket: String, root: StorePath, } -impl S3 { +impl ObjStoreClient { + pub fn new(client: LimitStore, bucket: String, root: StorePath) -> Self{ + ObjStoreClient { + client: client, + bucket: bucket, + root:root, + } + } + async fn _get_object(&self, path: &RelativePath) -> Result { let instant = Instant::now(); @@ -448,7 +458,7 @@ impl S3 { } #[async_trait] -impl ObjectStorage for S3 { +impl ObjectStorage for ObjStoreClient { async fn get_object(&self, path: &RelativePath) -> Result { Ok(self._get_object(path).await?) } From 9c83ef30cc194d5ff5ec116ab11990564d774fb8 Mon Sep 17 00:00:00 2001 From: Piyush Singariya Date: Sat, 5 Oct 2024 17:28:16 +0530 Subject: [PATCH 2/2] adding URL to azure blob config Signed-off-by: Piyush Singariya --- server/src/storage/azure_blob.rs | 32 ++++++++++++++++++++++---------- server/src/storage/s3.rs | 4 ++-- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs index 90ca0f934..2355f48f9 100644 --- a/server/src/storage/azure_blob.rs +++ b/server/src/storage/azure_blob.rs @@ -15,8 +15,8 @@ * along with this program. If not, see . * */ -use object_store::azure::{MicrosoftAzureBuilder, MicrosoftAzure}; -use super::s3::ObjStoreClient; +use object_store::azure::MicrosoftAzureBuilder; +use super::s3::{ObjStoreClient, CONNECT_TIMEOUT_SECS, REQUEST_TIMEOUT_SECS}; use super::ObjectStorageProvider; use datafusion::execution::runtime_env::RuntimeConfig; use datafusion::datasource::object_store::{ @@ -27,6 +27,9 @@ use std::sync::Arc; use object_store::limit::LimitStore; use super::metrics_layer::MetricLayer; use object_store::path::Path as StorePath; +use object_store::ClientOptions; +use std::time::Duration; + #[derive(Debug, Clone, clap::Args)] #[command( @@ -38,6 +41,10 @@ use object_store::path::Path as StorePath; " )] pub struct AzureBlobConfig { + // The Azure Storage Account ID + #[arg(long, env = "P_AZR_URL", value_name = "url", required = false)] + pub url: String, + // The Azure Storage Account ID #[arg(long, env = "P_AZR_ACCOUNT", value_name = "account", required = true)] pub account: String, @@ -52,20 +59,25 @@ pub struct AzureBlobConfig { } impl AzureBlobConfig { - fn get_default_interface(&self) -> MicrosoftAzure { - let result = MicrosoftAzureBuilder::new() + fn get_default_builder(&self) -> MicrosoftAzureBuilder { + let client_options = ClientOptions::default() + .with_allow_http(true) + .with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS)) + .with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS)); + + let builder = MicrosoftAzureBuilder::new() + .with_endpoint(self.url.clone()) .with_account(self.account.clone()) .with_access_key(self.access_key.clone()) - .with_container_name(self.container.clone()) - .build().expect("Failed to build Microsoft Azure interface"); + .with_container_name(self.container.clone()); - return result + return builder.with_client_options(client_options) } } impl ObjectStorageProvider for AzureBlobConfig { fn get_datafusion_runtime(&self) -> RuntimeConfig { - let azure = self.get_default_interface(); + let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); let azure = MetricLayer::new(azure); @@ -78,7 +90,7 @@ impl ObjectStorageProvider for AzureBlobConfig { } fn get_object_store(&self) -> Arc { - let azure = self.get_default_interface(); + let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); @@ -86,7 +98,7 @@ impl ObjectStorageProvider for AzureBlobConfig { } fn get_endpoint(&self) -> String { - return String::from("to be implmented") + return self.url.clone() } fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index ad156d933..5bb1a235e 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -50,8 +50,8 @@ use super::{ #[allow(dead_code)] // in bytes const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; -const CONNECT_TIMEOUT_SECS: u64 = 5; -const REQUEST_TIMEOUT_SECS: u64 = 300; +pub const CONNECT_TIMEOUT_SECS: u64 = 5; +pub const REQUEST_TIMEOUT_SECS: u64 = 300; const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; #[derive(Debug, Clone, clap::Args)]