diff --git a/Cargo.lock b/Cargo.lock index 72a5f4264c..6c29453b99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3883,6 +3883,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "sha2", "tokio", "uuid", ] diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index f84e7ab67a..c559f6ec34 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -30,12 +30,13 @@ keywords = ["iceberg"] [features] default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] -storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] +storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs", "storage-azblob"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] storage-gcs = ["opendal/services-gcs"] +storage-azblob = ["opendal/services-azblob"] async-std = ["dep:async-std"] tokio = ["dep:tokio"] diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 330f210c6e..a71e1b0f07 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -41,6 +41,7 @@ use crate::{Error, ErrorKind, Result}; /// | Memory | `storage-memory` | `memory` | /// | S3 | `storage-s3` | `s3`, `s3a`| /// | GCS | `storage-gcs` | `gs`, `gcs`| +/// | AZBLOB | `storage-azblob` | `azblob` | #[derive(Clone, Debug)] pub struct FileIO { builder: FileIOBuilder, diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 215ea67c22..c8442c8158 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -89,6 +89,11 @@ mod storage_gcs; #[cfg(feature = "storage-gcs")] pub use storage_gcs::*; +#[cfg(feature = "storage-azblob")] +mod storage_azblob; +#[cfg(feature = "storage-azblob")] +pub use storage_azblob::*; + fn is_truthy(value: &str) -> bool { ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) } diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 605b317252..29d77473d7 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use opendal::layers::RetryLayer; +#[cfg(feature = "storage-azblob")] +use opendal::services::AzblobConfig; #[cfg(feature = "storage-gcs")] use opendal::services::GcsConfig; #[cfg(feature = "storage-s3")] @@ -47,6 +49,9 @@ pub(crate) enum Storage { }, #[cfg(feature = "storage-gcs")] Gcs { config: Arc }, + + #[cfg(feature = "storage-azblob")] + Azblob { config: Arc }, } impl Storage { @@ -70,6 +75,10 @@ impl Storage { Scheme::Gcs => Ok(Self::Gcs { config: super::gcs_config_parse(props)?.into(), }), + #[cfg(feature = "storage-azblob")] + Scheme::Azblob => Ok(Self::Azblob { + config: super::azblob_config_parse(props)?.into(), + }), // Update doc on [`FileIO`] when adding new schemes. _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -147,10 +156,24 @@ impl Storage { )) } } + #[cfg(feature = "storage-azblob")] + Storage::Azblob { config } => { + let operator = super::azblob_config_build(config, path)?; + let prefix = format!("azblob://{}/", operator.info().name()); + if path.starts_with(&prefix) { + Ok((operator, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid azblob url: {}, should start with {}", path, prefix), + )) + } + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), - not(feature = "storage-gcs") + not(feature = "storage-gcs"), + not(feature = "storage-azblob") ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/iceberg/src/io/storage_azblob.rs b/crates/iceberg/src/io/storage_azblob.rs new file mode 100644 index 0000000000..d0069e7bd9 --- /dev/null +++ b/crates/iceberg/src/io/storage_azblob.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +//! Azure blob storage properties + +use std::collections::HashMap; + +use opendal::services::AzblobConfig; +use opendal::Operator; +use url::Url; + +use crate::{Error, ErrorKind, Result}; + +/// Azure blob account name. +pub const AZBLOB_ACCOUNT_NAME: &str = "azblob.account-name"; +/// Azure blob account key. +pub const AZBLOB_ACCOUNT_KEY: &str = "azblob.account-key"; +/// Azure blob account endpoint. +pub const AZBLOB_ENDPOINT: &str = "azblob.endpoint"; + +/// Parse iceberg properties to [`AzblobConfig`]. +pub(crate) fn azblob_config_parse(mut m: HashMap) -> Result { + let mut cfg = AzblobConfig::default(); + + if let Some(account_name) = m.remove(AZBLOB_ACCOUNT_NAME) { + cfg.account_name = Some(account_name); + }; + if let Some(account_key) = m.remove(AZBLOB_ACCOUNT_KEY) { + cfg.account_key = Some(account_key); + }; + if let Some(endpoint) = m.remove(AZBLOB_ENDPOINT) { + cfg.endpoint = Some(endpoint); + }; + + Ok(cfg) +} + +/// Build a new OpenDAL [`Operator`] based on a provided [`AzblobConfig`]. +pub(crate) fn azblob_config_build(cfg: &AzblobConfig, path: &str) -> Result { + let url = Url::parse(path)?; + let container = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid azblob url: {}, container is required", path), + ) + })?; + + let mut cfg = cfg.clone(); + cfg.container = container.to_string(); + Ok(Operator::from_config(cfg)?.finish()) +}