Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ arrow-json = "52.1.0"
arrow-ipc = { version = "52.1.0", features = ["zstd"] }
arrow-select = "52.1.0"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
object_store = { version = "0.10.2", features = ["cloud", "aws", "azure"] } # cannot update object_store as datafusion has not caught up
parquet = "52.1.0"
arrow-flight = { version = "52.1.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
Expand Down
39 changes: 39 additions & 0 deletions server/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,42 @@ pub mod s3 {
}
}
}

pub mod azureblob {
use crate::{metrics::METRICS_NAMESPACE, storage::AzureBlobConfig};
use once_cell::sync::Lazy;
use prometheus::{HistogramOpts, HistogramVec};

use super::StorageMetrics;

pub static REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("azr_blob_response_time", "AzureBlob Request Latency")
.namespace(METRICS_NAMESPACE),
&["method", "status"],
)
.expect("metric can be created")
});

pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("query_azr_blob_response_time", "AzureBlob Request Latency")
.namespace(METRICS_NAMESPACE),
&["method", "status"],
)
.expect("metric can be created")
});

impl StorageMetrics for AzureBlobConfig {
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
handler
.registry
.register(Box::new(REQUEST_RESPONSE_TIME.clone()))
.expect("metric can be registered");
handler
.registry
.register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone()))
.expect("metric can be registered");
}
}
}
28 changes: 25 additions & 3 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::cli::Cli;
use crate::storage::object_storage::parseable_json_path;
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
use crate::storage::{AzureBlobConfig, FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
use bytes::Bytes;
use clap::error::ErrorKind;
use clap::{command, Args, Command, FromArgMatches};
Expand Down Expand Up @@ -105,6 +105,22 @@ Cloud Native, log analytics platform for modern applications."#,
storage_name: "s3",
}
}
Some(("azure-blob", m)) => {
let cli = match Cli::from_arg_matches(m) {
Ok(cli) => cli,
Err(err) => err.exit(),
};
let storage = match AzureBlobConfig::from_arg_matches(m) {
Ok(storage) => storage,
Err(err) => err.exit(),
};

Config {
parseable: cli,
storage: Arc::new(storage),
storage_name: "azure_blob",
}
}
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -163,11 +179,14 @@ Cloud Native, log analytics platform for modern applications."#,
// returns the string representation of the storage mode
// drive --> Local drive
// s3 --> S3 bucket
// azure_blob --> Azure Blob Storage
pub fn get_storage_mode_string(&self) -> &str {
if self.storage_name == "drive" {
return "Local drive";
} else if self.storage_name == "s3" {
return "S3 bucket"
}
"S3 bucket"
return "Azure Blob Storage"
}

pub fn get_server_mode_string(&self) -> &str {
Expand All @@ -193,6 +212,9 @@ fn create_parseable_cli_command() -> Command {
let s3 = Cli::create_cli_command_with_clap("s3-store");
let s3 = <S3Config as Args>::augment_args_for_update(s3);

let azureblob = Cli::create_cli_command_with_clap("azure-blob");
let azureblob = <AzureBlobConfig as Args>::augment_args_for_update(azureblob);

command!()
.name("Parseable")
.bin_name("parseable")
Expand All @@ -207,7 +229,7 @@ Join the community at https://logg.ing/community.
"#,
)
.subcommand_required(true)
.subcommands([local, s3])
.subcommands([local, s3, azureblob])
}

#[derive(Debug, Default, Eq, PartialEq)]
Expand Down
2 changes: 2 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod metrics_layer;
pub(crate) mod object_storage;
pub mod retention;
mod s3;
mod azure_blob;
pub mod staging;
mod store_metadata;

Expand All @@ -37,6 +38,7 @@ pub use self::staging::StorageDir;
pub use localfs::FSConfig;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::S3Config;
pub use azure_blob::AzureBlobConfig;
pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
};
Expand Down
107 changes: 107 additions & 0 deletions server/src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use object_store::azure::MicrosoftAzureBuilder;
use super::s3::{ObjStoreClient, CONNECT_TIMEOUT_SECS, REQUEST_TIMEOUT_SECS};
use super::ObjectStorageProvider;
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::datasource::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
};
use crate::metrics::storage::StorageMetrics;
use std::sync::Arc;
use object_store::limit::LimitStore;
use super::metrics_layer::MetricLayer;
use object_store::path::Path as StorePath;
use object_store::ClientOptions;
use std::time::Duration;


#[derive(Debug, Clone, clap::Args)]
#[command(
name = "Azure config",
about = "Start Parseable with Azure Blob storage",
help_template = "\
{about-section}
{all-args}
"
)]
pub struct AzureBlobConfig {
// The Azure Storage Account ID
#[arg(long, env = "P_AZR_URL", value_name = "url", required = false)]
pub url: String,

// The Azure Storage Account ID
#[arg(long, env = "P_AZR_ACCOUNT", value_name = "account", required = true)]
pub account: String,

/// The Azure Storage Access key
#[arg(long, env = "P_AZR_ACCESS_KEY", value_name = "access-key", required = true)]
pub access_key: String,

/// The container name to be used for storage
#[arg(long, env = "P_AZR_CONTAINER", value_name = "container", required = true)]
pub container: String,
}

impl AzureBlobConfig {
fn get_default_builder(&self) -> MicrosoftAzureBuilder {
let client_options = ClientOptions::default()
.with_allow_http(true)
.with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
.with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS));

let builder = MicrosoftAzureBuilder::new()
.with_endpoint(self.url.clone())
.with_account(self.account.clone())
.with_access_key(self.access_key.clone())
.with_container_name(self.container.clone());

return builder.with_client_options(client_options)
}
}

impl ObjectStorageProvider for AzureBlobConfig {
fn get_datafusion_runtime(&self) -> RuntimeConfig {
let azure = self.get_default_builder().build().unwrap();
// limit objectstore to a concurrent request limit
let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS);
let azure = MetricLayer::new(azure);

let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new();
let url = ObjectStoreUrl::parse(format!("az://{}", &self.container)).unwrap();
object_store_registry.register_store(url.as_ref(), Arc::new(azure));

RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry))
}

fn get_object_store(&self) -> Arc<dyn super::ObjectStorage + Send> {
let azure = self.get_default_builder().build().unwrap();

// limit objectstore to a concurrent request limit
let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS);
Arc::new(ObjStoreClient::new(azure, self.container.clone(), StorePath::from("")))
}

fn get_endpoint(&self) -> String {
return self.url.clone()
}

fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
self.register_metrics(handler)
}
}
28 changes: 19 additions & 9 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::datasource::object_store::{
use datafusion::execution::runtime_env::RuntimeConfig;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey, Checksum};
use object_store::limit::LimitStore;
use object_store::path::Path as StorePath;
use object_store::{ClientOptions, ObjectStore, PutPayload};
Expand All @@ -50,8 +50,8 @@ use super::{
#[allow(dead_code)]
// in bytes
const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
const CONNECT_TIMEOUT_SECS: u64 = 5;
const REQUEST_TIMEOUT_SECS: u64 = 300;
pub const CONNECT_TIMEOUT_SECS: u64 = 5;
pub const REQUEST_TIMEOUT_SECS: u64 = 300;
const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";

#[derive(Debug, Clone, clap::Args)]
Expand Down Expand Up @@ -200,7 +200,7 @@ impl ObjectStorageProvider for S3Config {
// limit objectstore to a concurrent request limit
let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS);

Arc::new(S3 {
Arc::new(ObjStoreClient {
client: s3,
bucket: self.bucket_name.clone(),
root: StorePath::from(""),
Expand All @@ -216,17 +216,27 @@ impl ObjectStorageProvider for S3Config {
}
}

fn to_object_store_path(path: &RelativePath) -> StorePath {
pub fn to_object_store_path(path: &RelativePath) -> StorePath {
StorePath::from(path.as_str())
}

pub struct S3 {
client: LimitStore<AmazonS3>,
// ObjStoreClient is generic client to enable interactions with different cloudprovider's
// object store such as S3 and Azure Blob
pub struct ObjStoreClient <T :ObjectStore>{
client: LimitStore<T>,
bucket: String,
root: StorePath,
}

impl S3 {
impl<T: ObjectStore> ObjStoreClient <T> {
pub fn new(client: LimitStore<T>, bucket: String, root: StorePath) -> Self{
ObjStoreClient {
client: client,
bucket: bucket,
root:root,
}
}

async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
let instant = Instant::now();

Expand Down Expand Up @@ -448,7 +458,7 @@ impl S3 {
}

#[async_trait]
impl ObjectStorage for S3 {
impl<T: ObjectStore> ObjectStorage for ObjStoreClient<T> {
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
Ok(self._get_object(path).await?)
}
Expand Down
Loading