|
15 | 15 | * along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | * |
17 | 17 | */ |
18 | | -use super::object_storage::parseable_json_path; |
19 | | -use super::{ |
20 | | - ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, |
21 | | - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, |
| 18 | + |
| 19 | +use std::{ |
| 20 | + collections::{BTreeMap, HashSet}, |
| 21 | + path::Path, |
| 22 | + sync::Arc, |
| 23 | + time::{Duration, Instant}, |
22 | 24 | }; |
| 25 | + |
23 | 26 | use async_trait::async_trait; |
24 | 27 | use bytes::Bytes; |
25 | | -use datafusion::datasource::listing::ListingTableUrl; |
26 | | -use datafusion::datasource::object_store::{ |
27 | | - DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, |
| 28 | +use datafusion::{ |
| 29 | + datasource::listing::ListingTableUrl, |
| 30 | + execution::{ |
| 31 | + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, |
| 32 | + runtime_env::RuntimeEnvBuilder, |
| 33 | + }, |
| 34 | +}; |
| 35 | +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; |
| 36 | +use object_store::{ |
| 37 | + azure::{MicrosoftAzure, MicrosoftAzureBuilder}, |
| 38 | + limit::LimitStore, |
| 39 | + path::Path as StorePath, |
| 40 | + BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig, |
28 | 41 | }; |
29 | | -use datafusion::execution::runtime_env::RuntimeEnvBuilder; |
30 | | -use futures::stream::FuturesUnordered; |
31 | | -use futures::{StreamExt, TryStreamExt}; |
32 | | -use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; |
33 | | -use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; |
34 | 42 | use relative_path::{RelativePath, RelativePathBuf}; |
35 | | -use std::path::Path as StdPath; |
36 | 43 | use tracing::{error, info}; |
37 | 44 | use url::Url; |
38 | 45 |
|
39 | | -use super::metrics_layer::MetricLayer; |
40 | | -use crate::handlers::http::users::USERS_ROOT_DIR; |
41 | | -use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME; |
42 | | -use crate::metrics::storage::StorageMetrics; |
43 | | -use crate::parseable::LogStream; |
44 | | -use object_store::limit::LimitStore; |
45 | | -use object_store::path::Path as StorePath; |
46 | | -use std::collections::{BTreeMap, HashMap, HashSet}; |
47 | | -use std::sync::Arc; |
48 | | -use std::time::{Duration, Instant}; |
| 46 | +use crate::{ |
| 47 | + handlers::http::users::USERS_ROOT_DIR, |
| 48 | + metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, |
| 49 | + parseable::LogStream, |
| 50 | +}; |
49 | 51 |
|
50 | | -const CONNECT_TIMEOUT_SECS: u64 = 5; |
51 | | -const REQUEST_TIMEOUT_SECS: u64 = 300; |
| 52 | +use super::{ |
| 53 | + metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, |
| 54 | + ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, |
| 55 | + PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, |
| 56 | + STREAM_ROOT_DIRECTORY, |
| 57 | +}; |
52 | 58 |
|
53 | 59 | #[derive(Debug, Clone, clap::Args)] |
54 | 60 | #[command( |
@@ -161,7 +167,7 @@ impl ObjectStorageProvider for AzureBlobConfig { |
161 | 167 | let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); |
162 | 168 | let azure = MetricLayer::new(azure); |
163 | 169 |
|
164 | | - let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); |
| 170 | + let object_store_registry = DefaultObjectStoreRegistry::new(); |
165 | 171 | let url = ObjectStoreUrl::parse(format!("https://{}.blob.core.windows.net", self.account)) |
166 | 172 | .unwrap(); |
167 | 173 | object_store_registry.register_store(url.as_ref(), Arc::new(azure)); |
@@ -190,10 +196,6 @@ impl ObjectStorageProvider for AzureBlobConfig { |
190 | 196 | } |
191 | 197 | } |
192 | 198 |
|
193 | | -pub fn to_object_store_path(path: &RelativePath) -> StorePath { |
194 | | - StorePath::from(path.as_str()) |
195 | | -} |
196 | | - |
197 | 199 | // ObjStoreClient is generic client to enable interactions with different cloudprovider's |
198 | 200 | // object store such as S3 and Azure Blob |
199 | 201 | #[derive(Debug)] |
@@ -347,7 +349,7 @@ impl BlobStore { |
347 | 349 | } |
348 | 350 | Ok(result_file_list) |
349 | 351 | } |
350 | | - async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { |
| 352 | + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { |
351 | 353 | let instant = Instant::now(); |
352 | 354 |
|
353 | 355 | // // TODO: Uncomment this when multipart is fixed |
@@ -376,7 +378,7 @@ impl BlobStore { |
376 | 378 | } |
377 | 379 |
|
378 | 380 | // TODO: introduce parallel, multipart-uploads if required |
379 | | - // async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { |
| 381 | + // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { |
380 | 382 | // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; |
381 | 383 | // let mut file = OpenOptions::new().read(true).open(path).await?; |
382 | 384 |
|
@@ -623,7 +625,7 @@ impl ObjectStorage for BlobStore { |
623 | 625 | Ok(files) |
624 | 626 | } |
625 | 627 |
|
626 | | - async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { |
| 628 | + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { |
627 | 629 | self._upload_file(key, path).await?; |
628 | 630 |
|
629 | 631 | Ok(()) |
@@ -663,126 +665,21 @@ impl ObjectStorage for BlobStore { |
663 | 665 | .collect::<Vec<_>>()) |
664 | 666 | } |
665 | 667 |
|
666 | | - async fn get_all_dashboards( |
667 | | - &self, |
668 | | - ) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> { |
669 | | - let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new(); |
670 | | - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); |
671 | | - let resp = self |
672 | | - .client |
673 | | - .list_with_delimiter(Some(&users_root_path)) |
674 | | - .await?; |
675 | | - |
676 | | - let users = resp |
677 | | - .common_prefixes |
678 | | - .iter() |
679 | | - .flat_map(|path| path.parts()) |
680 | | - .filter(|name| name.as_ref() != USERS_ROOT_DIR) |
681 | | - .map(|name| name.as_ref().to_string()) |
682 | | - .collect::<Vec<_>>(); |
683 | | - for user in users { |
684 | | - let user_dashboard_path = |
685 | | - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); |
686 | | - let dashboards_path = RelativePathBuf::from(&user_dashboard_path); |
687 | | - let dashboard_bytes = self |
688 | | - .get_objects( |
689 | | - Some(&dashboards_path), |
690 | | - Box::new(|file_name| file_name.ends_with(".json")), |
691 | | - ) |
692 | | - .await?; |
693 | | - |
694 | | - dashboards |
695 | | - .entry(dashboards_path) |
696 | | - .or_default() |
697 | | - .extend(dashboard_bytes); |
698 | | - } |
699 | | - Ok(dashboards) |
700 | | - } |
701 | | - |
702 | | - async fn get_all_saved_filters( |
| 668 | + async fn list_dirs_relative( |
703 | 669 | &self, |
704 | | - ) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> { |
705 | | - let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new(); |
706 | | - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); |
707 | | - let resp = self |
708 | | - .client |
709 | | - .list_with_delimiter(Some(&users_root_path)) |
710 | | - .await?; |
| 670 | + relative_path: &RelativePath, |
| 671 | + ) -> Result<Vec<String>, ObjectStorageError> { |
| 672 | + let prefix = object_store::path::Path::from(relative_path.as_str()); |
| 673 | + let resp = self.client.list_with_delimiter(Some(&prefix)).await?; |
711 | 674 |
|
712 | | - let users = resp |
| 675 | + Ok(resp |
713 | 676 | .common_prefixes |
714 | 677 | .iter() |
715 | 678 | .flat_map(|path| path.parts()) |
716 | | - .filter(|name| name.as_ref() != USERS_ROOT_DIR) |
717 | 679 | .map(|name| name.as_ref().to_string()) |
718 | | - .collect::<Vec<_>>(); |
719 | | - for user in users { |
720 | | - let user_filters_path = |
721 | | - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); |
722 | | - let resp = self |
723 | | - .client |
724 | | - .list_with_delimiter(Some(&user_filters_path)) |
725 | | - .await?; |
726 | | - let streams = resp |
727 | | - .common_prefixes |
728 | | - .iter() |
729 | | - .filter(|name| name.as_ref() != USERS_ROOT_DIR) |
730 | | - .map(|name| name.as_ref().to_string()) |
731 | | - .collect::<Vec<_>>(); |
732 | | - for stream in streams { |
733 | | - let filters_path = RelativePathBuf::from(&stream); |
734 | | - let filter_bytes = self |
735 | | - .get_objects( |
736 | | - Some(&filters_path), |
737 | | - Box::new(|file_name| file_name.ends_with(".json")), |
738 | | - ) |
739 | | - .await?; |
740 | | - filters |
741 | | - .entry(filters_path) |
742 | | - .or_default() |
743 | | - .extend(filter_bytes); |
744 | | - } |
745 | | - } |
746 | | - Ok(filters) |
| 680 | + .collect::<Vec<_>>()) |
747 | 681 | } |
748 | 682 |
|
749 | | - ///fetch all correlations uploaded in object store |
750 | | - /// return the correlation file path and all correlation json bytes for each file path |
751 | | - async fn get_all_correlations( |
752 | | - &self, |
753 | | - ) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> { |
754 | | - let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new(); |
755 | | - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); |
756 | | - let resp = self |
757 | | - .client |
758 | | - .list_with_delimiter(Some(&users_root_path)) |
759 | | - .await?; |
760 | | - |
761 | | - let users = resp |
762 | | - .common_prefixes |
763 | | - .iter() |
764 | | - .flat_map(|path| path.parts()) |
765 | | - .filter(|name| name.as_ref() != USERS_ROOT_DIR) |
766 | | - .map(|name| name.as_ref().to_string()) |
767 | | - .collect::<Vec<_>>(); |
768 | | - for user in users { |
769 | | - let user_correlation_path = |
770 | | - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations")); |
771 | | - let correlations_path = RelativePathBuf::from(&user_correlation_path); |
772 | | - let correlation_bytes = self |
773 | | - .get_objects( |
774 | | - Some(&correlations_path), |
775 | | - Box::new(|file_name| file_name.ends_with(".json")), |
776 | | - ) |
777 | | - .await?; |
778 | | - |
779 | | - correlations |
780 | | - .entry(correlations_path) |
781 | | - .or_default() |
782 | | - .extend(correlation_bytes); |
783 | | - } |
784 | | - Ok(correlations) |
785 | | - } |
786 | 683 | fn get_bucket_name(&self) -> String { |
787 | 684 | self.container.clone() |
788 | 685 | } |
|
0 commit comments