diff --git a/resources/formats.json b/resources/formats.json index 18dad8ae6..ad5efae39 100644 --- a/resources/formats.json +++ b/resources/formats.json @@ -1466,8 +1466,22 @@ ] }, { - "name": "rust_server_logs", + "name": "parseable_server_logs", "regex": [ + { + "pattern": "^(?P\\S+)\\s+(?P\\S+)\\s+(?P\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z?)\\s+(?P\\w+)\\s+(?P\\S+)\\s+(?PThreadId\\(\\d+\\))\\s+(?P.*?):(?P\\d+):\\s+(?P.*)", + "fields": [ + "customer_id", + "deployment_id", + "timestamp", + "level", + "logger_context", + "thread_id", + "module", + "line_number", + "body" + ] + }, { "pattern": "^(?P\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s+(?P\\w+)\\s+(?P\\S+)\\s+(?PThreadId\\(\\d+\\))\\s+(?P.*?):(?P\\d+):\\s+(?P.*)", "fields": [ diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 5c8c411a2..457a48702 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -193,17 +193,17 @@ fn extract_partition_metrics(stream_name: &str, partition_lower: DateTime) let events_ingested = EVENTS_INGESTED_DATE .get_metric_with_label_values(&event_labels) - .map(|metric| metric.get() as u64) + .map(|metric| metric.get()) .unwrap_or(0); let ingestion_size = EVENTS_INGESTED_SIZE_DATE .get_metric_with_label_values(&event_labels) - .map(|metric| metric.get() as u64) + .map(|metric| metric.get()) .unwrap_or(0); let storage_size = EVENTS_STORAGE_SIZE_DATE .get_metric_with_label_values(&storage_labels) - .map(|metric| metric.get() as u64) + .map(|metric| metric.get()) .unwrap_or(0); (events_ingested, ingestion_size, storage_size) diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs index d231dc2b2..b5a155972 100644 --- a/src/event/format/known_schema.rs +++ b/src/event/format/known_schema.rs @@ -515,25 +515,19 @@ mod tests { } #[test] - fn test_rust_server_logs() { + fn test_parseable_server_logs() { let processor = EventProcessor::new(FORMATS_JSON); let schema = processor .schema_definitions - .get("rust_server_logs") + .get("parseable_server_logs") .unwrap(); let test_logs = vec![ // Current parseable format with ThreadId - "2025-09-06T10:43:01.628980875Z WARN main ThreadId(01) parseable::handlers::http::cluster:919: node http://0.0.0.0:8010/ is not live", - "2025-09-06T10:44:12.62276265Z ERROR actix-rt|system:0|arbiter:17 ThreadId(163) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named a. Valid fields are serverlogs.log\")", - "2025-09-06T05:16:46.092071318Z ERROR actix-rt|system:0|arbiter:21 ThreadId(167) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named ansible.host.ip\")", - "2025-09-06T11:22:07.500864363Z WARN main ThreadId(01) parseable_enterprise:70: Received shutdown signal, notifying server to shut down...", - // env_logger format - "[2025-09-06T10:43:01.628980875Z INFO parseable::storage] Initializing storage backend", - "[2025-09-06T10:43:01.628980875Z ERROR parseable::http::ingest] Failed to parse JSON", - // Simple tracing format (no ThreadId) - "2025-09-06T10:43:01.628980875Z INFO parseable::storage::s3: Storage configured successfully", - "2025-09-06T10:43:01.628980875Z DEBUG parseable::query::engine: Query executed in 45ms", + "01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T10:43:01.628980875Z WARN main ThreadId(01) parseable::handlers::http::cluster:919: node http://0.0.0.0:8010/ is not live", + "01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T10:44:12.62276265Z ERROR actix-rt|system:0|arbiter:17 ThreadId(163) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named a. Valid fields are serverlogs.log\")", + "01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T05:16:46.092071318Z ERROR actix-rt|system:0|arbiter:21 ThreadId(167) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named ansible.host.ip\")", + "01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T11:22:07.500864363Z WARN main ThreadId(01) parseable_enterprise:70: Received shutdown signal, notifying server to shut down...", ]; for (i, log_text) in test_logs.iter().enumerate() { diff --git a/src/event/mod.rs b/src/event/mod.rs index 4da88de1a..d5a0ef25d 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -28,6 +28,7 @@ use self::error::EventError; use crate::{ LOCK_EXPECT, metadata::update_stats, + metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date}, parseable::{PARSEABLE, StagingError}, storage::StreamType, }; @@ -88,6 +89,11 @@ impl Event { self.parsed_timestamp.date(), ); + // Track billing metrics for event ingestion + let date_string = self.parsed_timestamp.date().to_string(); + increment_events_ingested_by_date(self.rb.num_rows() as u64, &date_string); + increment_events_ingested_size_by_date(self.origin_size, &date_string); + crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); Ok(()) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 96553b06c..628bd9f0f 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -116,8 +116,6 @@ impl ParseableServer for IngestServer { }) .await; - PARSEABLE.storage.register_store_metrics(prometheus); - migration::run_migration(&PARSEABLE).await?; // local sync on init diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 5e6fa0860..c551884d4 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -106,7 +106,6 @@ impl ParseableServer for QueryServer { prometheus: &PrometheusMetrics, shutdown_rx: oneshot::Receiver<()>, ) -> anyhow::Result<()> { - PARSEABLE.storage.register_store_metrics(prometheus); // write the ingestor metadata to storage QUERIER_META .get_or_init(|| async { diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 7d1d46ee9..a522697aa 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -128,8 +128,6 @@ impl ParseableServer for Server { prometheus: &PrometheusMetrics, shutdown_rx: oneshot::Receiver<()>, ) -> anyhow::Result<()> { - PARSEABLE.storage.register_store_metrics(prometheus); - migration::run_migration(&PARSEABLE).await?; // load on init diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 014bed163..a9dd88b43 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -46,7 +46,7 @@ use tokio::task::JoinSet; use tracing::{error, warn}; use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema}; -use crate::metrics::QUERY_EXECUTE_TIME; +use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date}; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::query::error::ExecuteError; use crate::query::{CountsRequest, Query as LogicalQuery, execute}; @@ -123,6 +123,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result` // we use the `get_bin_density` method to get the count of records in the dataset // instead of executing the query using datafusion @@ -348,7 +352,9 @@ pub async fn get_counts( // does user have access to table? user_auth_for_datasets(&permissions, std::slice::from_ref(&body.stream)).await?; - + // Track billing metrics for query calls + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_query_calls_by_date(¤t_date); // if the user has given a sql query (counts call with filters applied), then use this flow // this could include filters or group by if body.conditions.is_some() { diff --git a/src/metadata.rs b/src/metadata.rs index 1e7061bfb..e49fc2119 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -46,13 +46,13 @@ pub fn update_stats( .add(num_rows as i64); EVENTS_INGESTED_DATE .with_label_values(&[stream_name, origin, &parsed_date]) - .add(num_rows as i64); + .inc_by(num_rows as u64); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, origin]) .add(size as i64); EVENTS_INGESTED_SIZE_DATE .with_label_values(&[stream_name, origin, &parsed_date]) - .add(size as i64); + .inc_by(size); LIFETIME_EVENTS_INGESTED .with_label_values(&[stream_name, origin]) .add(num_rows as i64); @@ -173,12 +173,12 @@ pub fn load_daily_metrics(manifests: &Vec, stream_name: &str) { let storage_size = manifest.storage_size; EVENTS_INGESTED_DATE .with_label_values(&[stream_name, "json", &manifest_date]) - .set(events_ingested as i64); + .inc_by(events_ingested); EVENTS_INGESTED_SIZE_DATE .with_label_values(&[stream_name, "json", &manifest_date]) - .set(ingestion_size as i64); + .inc_by(ingestion_size); EVENTS_STORAGE_SIZE_DATE .with_label_values(&["data", stream_name, "parquet", &manifest_date]) - .set(storage_size as i64); + .inc_by(storage_size); } } diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 1896bce0c..56f3d98b9 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -17,8 +17,6 @@ */ pub mod prom_utils; -pub mod storage; - use crate::{handlers::http::metrics_path, stats::FullStats}; use actix_web::Responder; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; @@ -30,7 +28,7 @@ pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { IntGaugeVec::new( - Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), + Opts::new("events_ingested", "Events ingested for a stream").namespace(METRICS_NAMESPACE), &["stream", "format"], ) .expect("metric can be created") @@ -38,8 +36,11 @@ pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( - Opts::new("events_ingested_size", "Events ingested size bytes") - .namespace(METRICS_NAMESPACE), + Opts::new( + "events_ingested_size", + "Events ingested size bytes for a stream", + ) + .namespace(METRICS_NAMESPACE), &["stream", "format"], ) .expect("metric can be created") @@ -47,7 +48,7 @@ pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { pub static STORAGE_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( - Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), + Opts::new("storage_size", "Storage size bytes for a stream").namespace(METRICS_NAMESPACE), &["type", "stream", "format"], ) .expect("metric can be created") @@ -55,7 +56,7 @@ pub static STORAGE_SIZE: Lazy = Lazy::new(|| { pub static EVENTS_DELETED: Lazy = Lazy::new(|| { IntGaugeVec::new( - Opts::new("events_deleted", "Events deleted").namespace(METRICS_NAMESPACE), + Opts::new("events_deleted", "Events deleted for a stream").namespace(METRICS_NAMESPACE), &["stream", "format"], ) .expect("metric can be created") @@ -63,7 +64,11 @@ pub static EVENTS_DELETED: Lazy = Lazy::new(|| { pub static EVENTS_DELETED_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( - Opts::new("events_deleted_size", "Events deleted size bytes").namespace(METRICS_NAMESPACE), + Opts::new( + "events_deleted_size", + "Events deleted size bytes for a stream", + ) + .namespace(METRICS_NAMESPACE), &["stream", "format"], ) .expect("metric can be created") @@ -73,7 +78,7 @@ pub static DELETED_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new( "deleted_events_storage_size", - "Deleted events storage size bytes", + "Deleted events storage size bytes for a stream", ) .namespace(METRICS_NAMESPACE), &["type", "stream", "format"], @@ -83,8 +88,11 @@ pub static DELETED_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { pub static LIFETIME_EVENTS_INGESTED: Lazy = Lazy::new(|| { IntGaugeVec::new( - Opts::new("lifetime_events_ingested", "Lifetime events ingested") - .namespace(METRICS_NAMESPACE), + Opts::new( + "lifetime_events_ingested", + "Lifetime events ingested for a stream", + ) + .namespace(METRICS_NAMESPACE), &["stream", "format"], ) .expect("metric can be created") @@ -94,7 +102,7 @@ pub static LIFETIME_EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new( "lifetime_events_ingested_size", - "Lifetime events ingested size bytes", + "Lifetime events ingested size bytes for a stream", ) .namespace(METRICS_NAMESPACE), &["stream", "format"], @@ -106,7 +114,7 @@ pub static LIFETIME_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new( "lifetime_events_storage_size", - "Lifetime events storage size bytes", + "Lifetime events storage size bytes for a stream", ) .namespace(METRICS_NAMESPACE), &["type", "stream", "format"], @@ -114,11 +122,11 @@ pub static LIFETIME_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); -pub static EVENTS_INGESTED_DATE: Lazy = Lazy::new(|| { - IntGaugeVec::new( +pub static EVENTS_INGESTED_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( Opts::new( "events_ingested_date", - "Events ingested on a particular date", + "Events ingested for a stream on a particular date", ) .namespace(METRICS_NAMESPACE), &["stream", "format", "date"], @@ -126,11 +134,11 @@ pub static EVENTS_INGESTED_DATE: Lazy = Lazy::new(|| { .expect("metric can be created") }); -pub static EVENTS_INGESTED_SIZE_DATE: Lazy = Lazy::new(|| { - IntGaugeVec::new( +pub static EVENTS_INGESTED_SIZE_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( Opts::new( "events_ingested_size_date", - "Events ingested size in bytes on a particular date", + "Events ingested size in bytes for a stream on a particular date", ) .namespace(METRICS_NAMESPACE), &["stream", "format", "date"], @@ -138,11 +146,11 @@ pub static EVENTS_INGESTED_SIZE_DATE: Lazy = Lazy::new(|| { .expect("metric can be created") }); -pub static EVENTS_STORAGE_SIZE_DATE: Lazy = Lazy::new(|| { - IntGaugeVec::new( +pub static EVENTS_STORAGE_SIZE_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( Opts::new( "events_storage_size_date", - "Events storage size in bytes on a particular date", + "Events storage size in bytes for a stream on a particular date", ) .namespace(METRICS_NAMESPACE), &["type", "stream", "format", "date"], @@ -182,6 +190,146 @@ pub static ALERTS_STATES: Lazy = Lazy::new(|| { .expect("metric can be created") }); +// Billing Metrics - Counter type metrics for billing/usage tracking +pub static TOTAL_EVENTS_INGESTED_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_events_ingested_by_date", + "Total events ingested by date", + ) + .namespace(METRICS_NAMESPACE), + &["date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_EVENTS_INGESTED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_events_ingested_size_by_date", + "Total events ingested size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_PARQUETS_STORED_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_parquets_stored_by_date", + "Total parquet files stored by date", + ) + .namespace(METRICS_NAMESPACE), + &["date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_PARQUETS_STORED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_parquets_stored_size_by_date", + "Total parquet files stored size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_QUERY_CALLS_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new("total_query_calls_by_date", "Total query calls by date") + .namespace(METRICS_NAMESPACE), + &["date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_files_scanned_in_query_by_date", + "Total files scanned in queries by date", + ) + .namespace(METRICS_NAMESPACE), + &["date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_bytes_scanned_in_query_by_date", + "Total bytes scanned in queries by date", + ) + .namespace(METRICS_NAMESPACE), + &["date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_OBJECT_STORE_CALLS_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_object_store_calls_by_date", + "Total object store calls by date", + ) + .namespace(METRICS_NAMESPACE), + &["provider", "method", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = + Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_files_scanned_in_object_store_calls_by_date", + "Total files scanned in object store calls by date", + ) + .namespace(METRICS_NAMESPACE), + &["provider", "method", "date"], + ) + .expect("metric can be created") + }); + +pub static TOTAL_INPUT_LLM_TOKENS_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_input_llm_tokens_by_date", + "Total input LLM tokens used by date", + ) + .namespace(METRICS_NAMESPACE), + &["provider", "model", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_OUTPUT_LLM_TOKENS_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_output_llm_tokens_by_date", + "Total output LLM tokens used by date", + ) + .namespace(METRICS_NAMESPACE), + &["provider", "model", "date"], + ) + .expect("metric can be created") +}); + +pub static STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("storage_request_response_time", "Storage Request Latency") + .namespace(METRICS_NAMESPACE), + &["provider", "method", "status"], + ) + .expect("metric can be created") +}); + fn custom_metrics(registry: &Registry) { registry .register(Box::new(EVENTS_INGESTED.clone())) @@ -231,6 +379,45 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(ALERTS_STATES.clone())) .expect("metric can be registered"); + // Register billing metrics + registry + .register(Box::new(TOTAL_EVENTS_INGESTED_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_EVENTS_INGESTED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_PARQUETS_STORED_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_PARQUETS_STORED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_QUERY_CALLS_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_OBJECT_STORE_CALLS_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new( + TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), + )) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_INPUT_LLM_TOKENS_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_OUTPUT_LLM_TOKENS_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); } pub fn build_metrics_handler() -> PrometheusMetrics { @@ -290,6 +477,76 @@ pub async fn fetch_stats_from_storage(stream_name: &str, stats: FullStats) { .set(stats.lifetime_stats.storage as i64); } +// Helper functions for tracking billing metrics +pub fn increment_events_ingested_by_date(count: u64, date: &str) { + TOTAL_EVENTS_INGESTED_BY_DATE + .with_label_values(&[date]) + .inc_by(count); +} + +pub fn increment_events_ingested_size_by_date(size: u64, date: &str) { + TOTAL_EVENTS_INGESTED_SIZE_BY_DATE + .with_label_values(&[date]) + .inc_by(size); +} + +pub fn increment_parquets_stored_by_date(date: &str) { + TOTAL_PARQUETS_STORED_BY_DATE + .with_label_values(&[date]) + .inc(); +} + +pub fn increment_parquets_stored_size_by_date(size: u64, date: &str) { + TOTAL_PARQUETS_STORED_SIZE_BY_DATE + .with_label_values(&[date]) + .inc_by(size); +} + +pub fn increment_query_calls_by_date(date: &str) { + TOTAL_QUERY_CALLS_BY_DATE.with_label_values(&[date]).inc(); +} + +pub fn increment_files_scanned_in_query_by_date(count: u64, date: &str) { + TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE + .with_label_values(&[date]) + .inc_by(count); +} + +pub fn increment_bytes_scanned_in_query_by_date(bytes: u64, date: &str) { + TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE + .with_label_values(&[date]) + .inc_by(bytes); +} + +pub fn increment_object_store_calls_by_date(provider: &str, method: &str, date: &str) { + TOTAL_OBJECT_STORE_CALLS_BY_DATE + .with_label_values(&[provider, method, date]) + .inc(); +} + +pub fn increment_files_scanned_in_object_store_calls_by_date( + provider: &str, + method: &str, + count: u64, + date: &str, +) { + TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE + .with_label_values(&[provider, method, date]) + .inc_by(count); +} + +pub fn increment_input_llm_tokens_by_date(provider: &str, model: &str, tokens: u64, date: &str) { + TOTAL_INPUT_LLM_TOKENS_BY_DATE + .with_label_values(&[provider, model, date]) + .inc_by(tokens); +} + +pub fn increment_output_llm_tokens_by_date(provider: &str, model: &str, tokens: u64, date: &str) { + TOTAL_OUTPUT_LLM_TOKENS_BY_DATE + .with_label_values(&[provider, model, date]) + .inc_by(tokens); +} + use actix_web::HttpResponse; pub async fn get() -> Result { diff --git a/src/metrics/storage.rs b/src/metrics/storage.rs deleted file mode 100644 index f96a317d9..000000000 --- a/src/metrics/storage.rs +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use actix_web_prometheus::PrometheusMetrics; - -pub trait StorageMetrics { - fn register_metrics(&self, handler: &PrometheusMetrics); -} - -pub mod localfs { - use crate::{metrics::METRICS_NAMESPACE, storage::FSConfig}; - use once_cell::sync::Lazy; - use prometheus::{HistogramOpts, HistogramVec}; - - use super::StorageMetrics; - - pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"], - ) - .expect("metric can be created") - }); - - impl StorageMetrics for FSConfig { - fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - } - } -} - -pub mod s3 { - use crate::{metrics::METRICS_NAMESPACE, storage::S3Config}; - use once_cell::sync::Lazy; - use prometheus::{HistogramOpts, HistogramVec}; - - use super::StorageMetrics; - - pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("s3_response_time", "S3 Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"], - ) - .expect("metric can be created") - }); - - pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("query_s3_response_time", "S3 Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"], - ) - .expect("metric can be created") - }); - - impl StorageMetrics for S3Config { - fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - } - } -} - -pub mod azureblob { - use crate::{metrics::METRICS_NAMESPACE, storage::AzureBlobConfig}; - use once_cell::sync::Lazy; - use prometheus::{HistogramOpts, HistogramVec}; - - use super::StorageMetrics; - - pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("azr_blob_response_time", "AzureBlob Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"], - ) - .expect("metric can be created") - }); - - pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("query_azr_blob_response_time", "AzureBlob Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"], - ) - .expect("metric can be created") - }); - - impl StorageMetrics for AzureBlobConfig { - fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - } - } -} - -pub mod gcs { - use crate::{metrics::METRICS_NAMESPACE, storage::GcsConfig}; - use once_cell::sync::Lazy; - use prometheus::{HistogramOpts, HistogramVec}; - - use super::StorageMetrics; - - pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("gcs_response_time", "GCS Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"], - ) - .expect("metric can be created") - }); - - pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("query_gcs_response_time", "GCS Request Latency") - .namespace(METRICS_NAMESPACE), - &["method", "status"], - ) - .expect("metric can be created") - }); - - impl StorageMetrics for GcsConfig { - fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - } - } -} diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 9ca484ead..989734811 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -16,7 +16,7 @@ * */ -use std::{collections::HashMap, ops::Bound, pin::Pin, sync::Arc}; +use std::{ops::Bound, sync::Arc}; use arrow_schema::Schema; use datafusion::{ @@ -27,9 +27,7 @@ use datafusion::{ error::DataFusionError, logical_expr::col, }; -use futures_util::{Future, TryStreamExt, stream::FuturesUnordered}; use itertools::Itertools; -use object_store::{ObjectMeta, ObjectStore, path::Path}; use crate::{ OBJECT_STORE_DATA_GRANULARITY, event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, @@ -56,7 +54,6 @@ impl ListingTableBuilder { pub async fn populate_via_listing( self, storage: Arc, - client: Arc, time_filters: &[PartialTimeFilter], ) -> Result { // Extract the minimum start time from the time filters. @@ -90,67 +87,28 @@ impl ListingTableBuilder { let prefixes = TimeRange::new(start_time.and_utc(), end_time.and_utc()) .generate_prefixes(OBJECT_STORE_DATA_GRANULARITY); - // Categorizes prefixes into "minute" and general resolve lists. - let mut minute_resolve = HashMap::>::new(); - let mut all_resolve = Vec::new(); + // Build all prefixes as relative paths + let prefixes: Vec<_> = prefixes + .into_iter() + .map(|prefix| { + relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)) + }) + .collect(); + + // Use storage.list_dirs_relative for all prefixes and flatten results + let mut listing = Vec::new(); for prefix in prefixes { - let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)); - let prefix = storage.absolute_url(path.as_relative_path()).to_string(); - if let Some(pos) = prefix.rfind("minute") { - let hour_prefix = &prefix[..pos]; - minute_resolve - .entry(hour_prefix.to_owned()) - .or_default() - .push(prefix); - } else { - all_resolve.push(prefix); + match storage.list_dirs_relative(&prefix).await { + Ok(paths) => { + listing.extend(paths.into_iter().map(|p| prefix.join(p).to_string())); + } + Err(e) => { + return Err(DataFusionError::External(Box::new(e))); + } } } - /// Resolve all prefixes asynchronously and collect the object metadata. - type ResolveFuture = - Pin, object_store::Error>> + Send>>; - let tasks: FuturesUnordered = FuturesUnordered::new(); - for (listing_prefix, prefixes) in minute_resolve { - let client = Arc::clone(&client); - tasks.push(Box::pin(async move { - let path = Path::from(listing_prefix); - let mut objects = client.list(Some(&path)).try_collect::>().await?; - - objects.retain(|obj| { - prefixes.iter().any(|prefix| { - obj.location - .prefix_matches(&object_store::path::Path::from(prefix.as_ref())) - }) - }); - - Ok(objects) - })); - } - - for prefix in all_resolve { - let client = Arc::clone(&client); - tasks.push(Box::pin(async move { - client - .list(Some(&object_store::path::Path::from(prefix))) - .try_collect::>() - .await - })); - } - - let listing = tasks - .try_collect::>>() - .await - .map_err(|err| DataFusionError::External(Box::new(err)))? - .into_iter() - .flat_map(|res| { - res.into_iter() - .map(|obj| obj.location.to_string()) - .collect::>() - }) - .sorted() - .rev() - .collect_vec(); + let listing = listing.into_iter().sorted().rev().collect_vec(); Ok(Self { stream: self.stream, diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index aa25c9926..b3907a9e9 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -20,7 +20,6 @@ use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; -use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc}; use datafusion::{ catalog::{SchemaProvider, Session}, @@ -45,21 +44,22 @@ use datafusion::{ prelude::Expr, scalar::ScalarValue, }; -use futures_util::{StreamExt, TryFutureExt, TryStreamExt, stream::FuturesOrdered}; +use futures_util::TryFutureExt; use itertools::Itertools; -use object_store::{ObjectStore, path::Path}; -use url::Url; use crate::{ catalog::{ ManifestFile, Snapshot as CatalogSnapshot, column::{Column, TypedStatistics}, - manifest::{File, Manifest}, + manifest::File, snapshot::{ManifestItem, Snapshot}, }, event::DEFAULT_TIMESTAMP_KEY, hottier::HotTierManager, - metrics::QUERY_CACHE_HIT, + metrics::{ + QUERY_CACHE_HIT, increment_bytes_scanned_in_query_by_date, + increment_files_scanned_in_query_by_date, + }, option::Mode, parseable::{PARSEABLE, STREAM_EXISTS}, storage::{ObjectStorage, ObjectStoreFormat}, @@ -91,7 +91,6 @@ impl SchemaProvider for GlobalSchemaProvider { .expect(STREAM_EXISTS) .get_schema(), stream: name.to_owned(), - url: self.storage.store_url(), }))) } else { Ok(None) @@ -108,8 +107,6 @@ struct StandardTableProvider { schema: SchemaRef, // prefix under which to find snapshot stream: String, - // url to find right instance of object store - url: Url, } impl StandardTableProvider { @@ -276,7 +273,6 @@ impl StandardTableProvider { &self, execution_plans: &mut Vec>, glob_storage: Arc, - object_store: Arc, time_filters: &[PartialTimeFilter], state: &dyn Session, projection: Option<&Vec>, @@ -285,7 +281,7 @@ impl StandardTableProvider { time_partition: Option, ) -> Result<(), DataFusionError> { ListingTableBuilder::new(self.stream.to_owned()) - .populate_via_listing(glob_storage.clone(), object_store, time_filters) + .populate_via_listing(glob_storage.clone(), time_filters) .and_then(|builder| async { let table = builder.build( self.schema.clone(), @@ -327,10 +323,12 @@ impl StandardTableProvider { &self, manifest_files: Vec, ) -> (Vec>, datafusion::common::Statistics) { - let target_partition = num_cpus::get(); + let target_partition: usize = num_cpus::get(); let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); let mut column_statistics = HashMap::>::new(); let mut count = 0; + let mut total_file_size = 0u64; + let mut file_count = 0u64; for (index, file) in manifest_files .into_iter() .enumerate() @@ -341,9 +339,14 @@ impl StandardTableProvider { mut file_path, num_rows, columns, + file_size, .. } = file; + // Track billing metrics for files scanned in query + file_count += 1; + total_file_size += file_size; + // object_store::path::Path doesn't automatically deal with Windows path separators // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem // before sending the file path to PartitionedFile @@ -400,6 +403,11 @@ impl StandardTableProvider { column_statistics: statistics, }; + // Track billing metrics for query scan + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_files_scanned_in_query_by_date(file_count, ¤t_date); + increment_bytes_scanned_in_query_by_date(total_file_size, ¤t_date); + (partitioned_files, statistics) } } @@ -487,11 +495,6 @@ impl TableProvider for StandardTableProvider { limit: Option, ) -> Result, DataFusionError> { let mut execution_plans = vec![]; - let object_store = state - .runtime_env() - .object_store_registry - .get_store(&self.url) - .unwrap(); let glob_storage = PARSEABLE.storage.get_object_store(); let object_store_format: ObjectStoreFormat = serde_json::from_slice( @@ -548,7 +551,6 @@ impl TableProvider for StandardTableProvider { self.legacy_listing_table( &mut execution_plans, glob_storage.clone(), - object_store.clone(), &listing_time_filter, state, projection, @@ -857,34 +859,15 @@ fn extract_timestamp_bound( DateTime::from_timestamp_nanos(*value).naive_utc(), )), ScalarValue::Utf8(Some(str_value)) if is_time_partition => { - Some((binexpr.op, str_value.parse::().unwrap())) + match str_value.parse::() { + Ok(dt) => Some((binexpr.op, dt)), + Err(_) => None, + } } _ => None, } } -pub async fn collect_manifest_files( - storage: Arc, - manifest_urls: Vec, -) -> Result, object_store::Error> { - let tasks = manifest_urls.into_iter().map(|path| { - let path = Path::parse(path).unwrap(); - let storage = Arc::clone(&storage); - async move { storage.get(&path).await } - }); - - let resp = FuturesOrdered::from_iter(tasks) - .and_then(|res| res.bytes()) - .collect::>>() - .await; - - Ok(resp - .into_iter() - .flat_map(|res| res.ok()) - .map(|bytes| serde_json::from_slice(&bytes).unwrap()) - .collect()) -} - // Extract start time and end time from filter predicate pub fn extract_primary_filter( filters: &[Expr], diff --git a/src/stats.rs b/src/stats.rs index 5a167cc39..ced7b10b6 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -20,9 +20,9 @@ use std::collections::HashMap; use std::sync::Arc; use once_cell::sync::Lazy; -use prometheus::IntGaugeVec; use prometheus::core::Collector; use prometheus::proto::MetricFamily; +use prometheus::{IntCounterVec, IntGaugeVec}; use tracing::warn; use crate::metrics::{ @@ -136,6 +136,7 @@ pub async fn update_deleted_stats( "parquet", &manifest_date, ]); + num_row += manifest.events_ingested as i64; ingestion_size += manifest.ingestion_size as i64; storage_size += manifest.storage_size as i64; @@ -197,7 +198,7 @@ fn remove_label_values(lazy_static: &Lazy, event_labels: &[&str]) { } } -fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { +fn delete_with_label_prefix(metrics: &IntCounterVec, prefix: &[&str]) { let families: Vec = metrics.collect().into_iter().collect(); for metric in families.iter().flat_map(|m| m.get_metric()) { let label_map: HashMap<&str, &str> = metric diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index acd867361..32b3cb1c5 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -19,12 +19,16 @@ use std::{ collections::HashSet, path::Path, - sync::Arc, - time::{Duration, Instant}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, }; use async_trait::async_trait; use bytes::Bytes; +use chrono::Utc; use datafusion::{ datasource::listing::ListingTableUrl, execution::{ @@ -42,11 +46,13 @@ use object_store::{ }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; -use tracing::{error, info}; +use tracing::error; use url::Url; use crate::{ - metrics::storage::{StorageMetrics, azureblob::REQUEST_RESPONSE_TIME}, + metrics::{ + increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + }, parseable::LogStream, }; @@ -166,7 +172,7 @@ impl ObjectStorageProvider for AzureBlobConfig { let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); - let azure = MetricLayer::new(azure); + let azure = MetricLayer::new(azure, "azure_blob"); let object_store_registry = DefaultObjectStoreRegistry::new(); let url = ObjectStoreUrl::parse(format!("https://{}.blob.core.windows.net", self.account)) @@ -191,10 +197,6 @@ impl ObjectStorageProvider for AzureBlobConfig { fn get_endpoint(&self) -> String { self.endpoint_url.clone() } - - fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - self.register_metrics(handler) - } } // ObjStoreClient is generic client to enable interactions with different cloudprovider's @@ -209,25 +211,25 @@ pub struct BlobStore { impl BlobStore { async fn _get_object(&self, path: &RelativePath) -> Result { - let instant = Instant::now(); let resp = self.client.get(&to_object_store_path(path)).await; + increment_object_store_calls_by_date( + "azure_blob", + "GET", + &Utc::now().date_naive().to_string(), + ); match resp { Ok(resp) => { - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(time); - let body = resp.bytes().await.unwrap(); + let body: Bytes = resp.bytes().await?; + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); Ok(body) } - Err(err) => { - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "400"]) - .observe(time); - Err(err.into()) - } + Err(err) => Err(err.into()), } } @@ -236,52 +238,105 @@ impl BlobStore { path: &RelativePath, resource: PutPayload, ) -> Result<(), ObjectStorageError> { - let time = Instant::now(); let resp = self.client.put(&to_object_store_path(path), resource).await; - let status = if resp.is_ok() { "200" } else { "400" }; - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["PUT", status]) - .observe(time); - - if let Err(object_store::Error::NotFound { source, .. }) = &resp { - return Err(ObjectStorageError::Custom( - format!("Failed to upload, error: {source:?}").to_string(), - )); + increment_object_store_calls_by_date( + "azure_blob", + "PUT", + &Utc::now().date_naive().to_string(), + ); + match resp { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) + } + Err(err) => Err(err.into()), } - - resp.map(|_| ()).map_err(|err| err.into()) } async fn _delete_prefix(&self, key: &str) -> Result<(), ObjectStorageError> { + let files_scanned = Arc::new(AtomicU64::new(0)); + let files_deleted = Arc::new(AtomicU64::new(0)); let object_stream = self.client.list(Some(&(key.into()))); + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); object_stream .for_each_concurrent(None, |x| async { + files_scanned.fetch_add(1, Ordering::Relaxed); + match x { Ok(obj) => { - if (self.client.delete(&obj.location).await).is_err() { - error!("Failed to fetch object during delete stream"); + files_deleted.fetch_add(1, Ordering::Relaxed); + let delete_resp = self.client.delete(&obj.location).await; + increment_object_store_calls_by_date( + "azure_blob", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + if delete_resp.is_err() { + error!( + "Failed to delete object during delete stream: {:?}", + delete_resp + ); } } - Err(_) => { - error!("Failed to fetch object during delete stream"); + Err(err) => { + error!("Failed to fetch object during delete stream: {:?}", err); } }; }) .await; + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + files_scanned.load(Ordering::Relaxed), + &Utc::now().date_naive().to_string(), + ); + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "DELETE", + files_deleted.load(Ordering::Relaxed), + &Utc::now().date_naive().to_string(), + ); Ok(()) } async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { - let resp = self + let resp: Result = self .client .list_with_delimiter(Some(&(stream.into()))) - .await?; + .await; + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); + + let resp = match resp { + Ok(resp) => resp, + Err(err) => { + return Err(err.into()); + } + }; let common_prefixes = resp.common_prefixes; + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + // return prefixes at the root level let dates: Vec<_> = common_prefixes .iter() @@ -293,31 +348,26 @@ impl BlobStore { } async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - let instant = Instant::now(); - - // // TODO: Uncomment this when multipart is fixed - // let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; - - let should_multipart = false; - - let res = if should_multipart { - // self._upload_multipart(key, path).await - // this branch will never get executed - Ok(()) - } else { - let bytes = tokio::fs::read(path).await?; - let result = self.client.put(&key.into(), bytes.into()).await?; - info!("Uploaded file to Azure Blob Storage: {:?}", result); - Ok(()) - }; - - let status = if res.is_ok() { "200" } else { "400" }; - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["UPLOAD_PARQUET", status]) - .observe(time); - - res + let bytes = tokio::fs::read(path).await?; + + let result = self.client.put(&key.into(), bytes.into()).await; + increment_object_store_calls_by_date( + "azure_blob", + "PUT", + &Utc::now().date_naive().to_string(), + ); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) + } + Err(err) => Err(err.into()), + } } async fn _upload_multipart( @@ -328,14 +378,45 @@ impl BlobStore { let mut file = OpenOptions::new().read(true).open(path).await?; let location = &to_object_store_path(key); - let mut async_writer = self.client.put_multipart(location).await?; + // Track multipart initiation + let async_writer = self.client.put_multipart(location).await; + increment_object_store_calls_by_date( + "azure_blob", + "PUT_MULTIPART", + &Utc::now().date_naive().to_string(), + ); + let mut async_writer = match async_writer { + Ok(writer) => writer, + Err(err) => { + return Err(err.into()); + } + }; let meta = file.metadata().await?; let total_size = meta.len() as usize; if total_size < MIN_MULTIPART_UPLOAD_SIZE { let mut data = Vec::new(); file.read_to_end(&mut data).await?; - self.client.put(location, data.into()).await?; + let result = self.client.put(location, data.into()).await; + increment_object_store_calls_by_date( + "azure_blob", + "PUT", + &Utc::now().date_naive().to_string(), + ); + + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + } + Err(err) => { + return Err(err.into()); + } + } // async_writer.put_part(data.into()).await?; // async_writer.complete().await?; return Ok(()); @@ -349,7 +430,7 @@ impl BlobStore { let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE; let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; - // Upload each part + // Upload each part with metrics for part_number in 0..(total_parts) { let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE; let end_pos = if part_number == num_full_parts && has_final_partial_part { @@ -363,72 +444,33 @@ impl BlobStore { // Extract this part's data let part_data = data[start_pos..end_pos].to_vec(); - // Upload the part - async_writer.put_part(part_data.into()).await?; + let result = async_writer.put_part(part_data.into()).await; + increment_object_store_calls_by_date( + "azure_blob", + "PUT_MULTIPART", + &Utc::now().date_naive().to_string(), + ); + if result.is_err() { + return Err(result.err().unwrap().into()); + } // upload_parts.push(part_number as u64 + 1); } - if let Err(err) = async_writer.complete().await { + + // Track multipart completion + let complete_result = async_writer.complete().await; + if let Err(err) = complete_result { error!("Failed to complete multipart upload. {:?}", err); async_writer.abort().await?; - }; + return Err(err.into()); + } } Ok(()) } - - // TODO: introduce parallel, multipart-uploads if required - // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; - // let mut file = OpenOptions::new().read(true).open(path).await?; - - // // let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?; - // let mut async_writer = self.client.put_multipart(&key.into()).await?; - - // /* `abort_multipart()` has been removed */ - // // let close_multipart = |err| async move { - // // error!("multipart upload failed. {:?}", err); - // // self.client - // // .abort_multipart(&key.into(), &multipart_id) - // // .await - // // }; - - // loop { - // match file.read(&mut buf).await { - // Ok(len) => { - // if len == 0 { - // break; - // } - // if let Err(err) = async_writer.write_all(&buf[0..len]).await { - // // close_multipart(err).await?; - // break; - // } - // if let Err(err) = async_writer.flush().await { - // // close_multipart(err).await?; - // break; - // } - // } - // Err(err) => { - // // close_multipart(err).await?; - // break; - // } - // } - // } - - // async_writer.shutdown().await?; - - // Ok(()) - // } } #[async_trait] impl ObjectStorage for BlobStore { - async fn upload_multipart( - &self, - key: &RelativePath, - path: &Path, - ) -> Result<(), ObjectStorageError> { - self._upload_multipart(key, path).await - } async fn get_buffered_reader( &self, _path: &RelativePath, @@ -440,13 +482,32 @@ impl ObjectStorage for BlobStore { ), ))) } - async fn head(&self, _path: &RelativePath) -> Result { - Err(ObjectStorageError::UnhandledError(Box::new( - std::io::Error::new( - std::io::ErrorKind::Unsupported, - "Head operation not implemented for Blob Storage yet", - ), - ))) + + async fn upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + self._upload_multipart(key, path).await + } + + async fn head(&self, path: &RelativePath) -> Result { + let result = self.client.head(&to_object_store_path(path)).await; + increment_object_store_calls_by_date( + "azure_blob", + "HEAD", + &Utc::now().date_naive().to_string(), + ); + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn get_object(&self, path: &RelativePath) -> Result { @@ -458,8 +519,6 @@ impl ObjectStorage for BlobStore { base_path: Option<&RelativePath>, filter_func: Box bool + Send>, ) -> Result, ObjectStorageError> { - let instant = Instant::now(); - let prefix = if let Some(base_path) = base_path { to_object_store_path(base_path) } else { @@ -469,8 +528,18 @@ impl ObjectStorage for BlobStore { let mut list_stream = self.client.list(Some(&prefix)); let mut res = vec![]; + let mut files_scanned = 0; + + // Note: We track each streaming list item retrieval + while let Some(meta_result) = list_stream.next().await { + let meta = match meta_result { + Ok(meta) => meta, + Err(err) => { + return Err(err.into()); + } + }; - while let Some(meta) = list_stream.next().await.transpose()? { + files_scanned += 1; let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); if !ingestor_file { @@ -483,38 +552,70 @@ impl ObjectStorage for BlobStore { .map_err(ObjectStorageError::PathError)?, ) .await?; - + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "azure_blob", + "GET", + &Utc::now().date_naive().to_string(), + ); res.push(byts); } - let instant = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(instant); - + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + files_scanned as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); Ok(res) } async fn get_ingestor_meta_file_paths( &self, ) -> Result, ObjectStorageError> { - let time = Instant::now(); let mut path_arr = vec![]; + let mut files_scanned = 0; + let mut object_stream = self.client.list(Some(&self.root)); + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); + + while let Some(meta_result) = object_stream.next().await { + let meta = match meta_result { + Ok(meta) => meta, + Err(err) => { + return Err(err.into()); + } + }; - while let Some(meta) = object_stream.next().await.transpose()? { + files_scanned += 1; let flag = meta.location.filename().unwrap().starts_with("ingestor"); if flag { path_arr.push(RelativePathBuf::from(meta.location.as_ref())); } } - - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(time); - + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + files_scanned as u64, + &Utc::now().date_naive().to_string(), + ); Ok(path_arr) } @@ -537,15 +638,44 @@ impl ObjectStorage for BlobStore { } async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { - Ok(self.client.delete(&to_object_store_path(path)).await?) + let result = self.client.delete(&to_object_store_path(path)).await; + increment_object_store_calls_by_date( + "azure_blob", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "DELETE", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn check(&self) -> Result<(), ObjectStorageError> { - Ok(self + let result = self .client .head(&to_object_store_path(&parseable_json_path())) - .await - .map(|_| ())?) + .await; + increment_object_store_calls_by_date( + "azure_blob", + "HEAD", + &Utc::now().date_naive().to_string(), + ); + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result.map(|_| ())?) } async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { @@ -556,19 +686,24 @@ impl ObjectStorage for BlobStore { async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { let file = RelativePathBuf::from(&node_filename); - match self.client.delete(&to_object_store_path(&file)).await { - Ok(_) => Ok(()), - Err(err) => { - // if the object is not found, it is not an error - // the given url path was incorrect - if matches!(err, object_store::Error::NotFound { .. }) { - error!("Node does not exist"); - Err(err.into()) - } else { - error!("Error deleting node meta file: {:?}", err); - Err(err.into()) - } + + let result = self.client.delete(&to_object_store_path(&file)).await; + increment_object_store_calls_by_date( + "azure_blob", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "DELETE", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) } + Err(err) => Err(err.into()), } } @@ -583,7 +718,17 @@ impl ObjectStorage for BlobStore { let resp = self.client.list_with_delimiter(None).await?; let common_prefixes = resp.common_prefixes; // get all dirs - + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); // return prefixes at the root level let dirs: HashSet<_> = common_prefixes .iter() @@ -596,13 +741,26 @@ impl ObjectStorage for BlobStore { for dir in &dirs { let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); - let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; + let task = async move { + let result = self.client.head(&StorePath::from(key)).await; + increment_object_store_calls_by_date( + "azure_blob", + "HEAD", + &Utc::now().date_naive().to_string(), + ); + result.map(|_| ()) + }; stream_json_check.push(task); } - + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "HEAD", + dirs.len() as u64, + &Utc::now().date_naive().to_string(), + ); stream_json_check.try_collect::<()>().await?; - Ok(dirs.into_iter().collect()) + Ok(dirs) } async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { @@ -618,8 +776,19 @@ impl ObjectStorage for BlobStore { ) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; - - let hours = resp + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); + + let hours: Vec = resp .common_prefixes .iter() .filter_map(|path| { @@ -647,8 +816,18 @@ impl ObjectStorage for BlobStore { ) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; - - let minutes = resp + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); + let minutes: Vec = resp .common_prefixes .iter() .filter_map(|path| { @@ -669,19 +848,8 @@ impl ObjectStorage for BlobStore { Ok(minutes) } - // async fn list_manifest_files( - // &self, - // stream_name: &str, - // ) -> Result>, ObjectStorageError> { - // let files = self._list_manifest_files(stream_name).await?; - - // Ok(files) - // } - async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - self._upload_file(key, path).await?; - - Ok(()) + Ok(self._upload_file(key, path).await?) } fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { @@ -708,7 +876,28 @@ impl ObjectStorage for BlobStore { async fn list_dirs(&self) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from("/"); - let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let resp = self.client.list_with_delimiter(Some(&pre)).await; + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); + let resp = match resp { + Ok(resp) => { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + + resp + } + Err(err) => { + return Err(err.into()); + } + }; Ok(resp .common_prefixes @@ -723,7 +912,27 @@ impl ObjectStorage for BlobStore { relative_path: &RelativePath, ) -> Result, ObjectStorageError> { let prefix = object_store::path::Path::from(relative_path.as_str()); - let resp = self.client.list_with_delimiter(Some(&prefix)).await?; + let resp = self.client.list_with_delimiter(Some(&prefix)).await; + increment_object_store_calls_by_date( + "azure_blob", + "LIST", + &Utc::now().date_naive().to_string(), + ); + let resp = match resp { + Ok(resp) => { + increment_files_scanned_in_object_store_calls_by_date( + "azure_blob", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + + resp + } + Err(err) => { + return Err(err.into()); + } + }; Ok(resp .common_prefixes diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 307359b51..93c7a954f 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -19,16 +19,22 @@ use std::{ collections::HashSet, path::Path, - sync::Arc, - time::{Duration, Instant}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, }; use crate::{ - metrics::storage::{StorageMetrics, gcs::REQUEST_RESPONSE_TIME}, + metrics::{ + increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + }, parseable::LogStream, }; use async_trait::async_trait; use bytes::Bytes; +use chrono::Utc; use datafusion::{ datasource::listing::ListingTableUrl, execution::{ @@ -46,7 +52,7 @@ use object_store::{ }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; -use tracing::{error, info}; +use tracing::error; use super::{ CONNECT_TIMEOUT_SECS, MIN_MULTIPART_UPLOAD_SIZE, ObjectStorage, ObjectStorageError, @@ -128,7 +134,7 @@ impl ObjectStorageProvider for GcsConfig { // limit objectstore to a concurrent request limit let gcs = LimitStore::new(gcs, super::MAX_OBJECT_STORE_REQUESTS); - let gcs = MetricLayer::new(gcs); + let gcs = MetricLayer::new(gcs, "gcs"); let object_store_registry = DefaultObjectStoreRegistry::new(); // Register GCS client under the "gs://" scheme so DataFusion can route @@ -153,10 +159,6 @@ impl ObjectStorageProvider for GcsConfig { format!("{}/{}", self.endpoint_url, self.bucket_name) } - fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - self.register_metrics(handler); - } - fn get_object_store(&self) -> Arc { static STORE: once_cell::sync::OnceCell> = once_cell::sync::OnceCell::new(); @@ -174,26 +176,20 @@ pub struct Gcs { impl Gcs { async fn _get_object(&self, path: &RelativePath) -> Result { - let instant = Instant::now(); - let resp = self.client.get(&to_object_store_path(path)).await; - + increment_object_store_calls_by_date("gcs", "GET", &Utc::now().date_naive().to_string()); match resp { Ok(resp) => { - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(time); - let body = resp.bytes().await.unwrap(); + let body: Bytes = resp.bytes().await?; + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); Ok(body) } - Err(err) => { - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "400"]) - .observe(time); - Err(err.into()) - } + Err(err) => Err(err.into()), } } @@ -202,83 +198,93 @@ impl Gcs { path: &RelativePath, resource: PutPayload, ) -> Result<(), ObjectStorageError> { - let time = Instant::now(); let resp = self.client.put(&to_object_store_path(path), resource).await; - let status = if resp.is_ok() { "200" } else { "400" }; - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["PUT", status]) - .observe(time); - - if let Err(object_store::Error::NotFound { source, .. }) = &resp { - let source_str = source.to_string(); - if source_str.contains("NoSuchBucket") { - return Err(ObjectStorageError::Custom( - format!("Bucket '{}' does not exist in GCS.", self.bucket).to_string(), - )); + increment_object_store_calls_by_date("gcs", "PUT", &Utc::now().date_naive().to_string()); + match resp { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) } + Err(err) => Err(err.into()), } - - resp.map(|_| ()).map_err(|err| err.into()) } async fn _delete_prefix(&self, key: &str) -> Result<(), ObjectStorageError> { + let files_scanned = Arc::new(AtomicU64::new(0)); + let files_deleted = Arc::new(AtomicU64::new(0)); + // Track LIST operation let object_stream = self.client.list(Some(&(key.into()))); - + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); object_stream .for_each_concurrent(None, |x| async { + files_scanned.fetch_add(1, Ordering::Relaxed); + match x { Ok(obj) => { - if (self.client.delete(&obj.location).await).is_err() { - error!("Failed to fetch object during delete stream"); + files_deleted.fetch_add(1, Ordering::Relaxed); + let delete_resp = self.client.delete(&obj.location).await; + increment_object_store_calls_by_date( + "gcs", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + if delete_resp.is_err() { + error!( + "Failed to delete object during delete stream: {:?}", + delete_resp + ); } } - Err(_) => { - error!("Failed to fetch object during delete stream"); + Err(err) => { + error!("Failed to fetch object during delete stream: {:?}", err); } }; }) .await; + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + files_scanned.load(Ordering::Relaxed), + &Utc::now().date_naive().to_string(), + ); + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "DELETE", + files_deleted.load(Ordering::Relaxed), + &Utc::now().date_naive().to_string(), + ); Ok(()) } - // async fn _list_streams(&self) -> Result, ObjectStorageError> { - // let mut result_file_list = HashSet::new(); - // let resp = self.client.list_with_delimiter(None).await?; - - // let streams = resp - // .common_prefixes - // .iter() - // .flat_map(|path| path.parts()) - // .map(|name| name.as_ref().to_string()) - // .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR) - // .collect::>(); - - // for stream in streams { - // let stream_path = - // object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); - // let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; - // if resp - // .objects - // .iter() - // .any(|name| name.location.filename().unwrap().ends_with("stream.json")) - // { - // result_file_list.insert(stream); - // } - // } - - // Ok(result_file_list) - // } - async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { - let resp = self + let resp: Result = self .client .list_with_delimiter(Some(&(stream.into()))) - .await?; + .await; + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + + let resp = match resp { + Ok(resp) => resp, + Err(err) => { + return Err(err.into()); + } + }; let common_prefixes = resp.common_prefixes; + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + // return prefixes at the root level let dates: Vec<_> = common_prefixes .iter() @@ -288,19 +294,24 @@ impl Gcs { Ok(dates) } - async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - let instant = Instant::now(); + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let bytes = tokio::fs::read(path).await?; - let result = self.client.put(&key.into(), bytes.into()).await?; - info!("Uploaded file to GCS: {:?}", result); - - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["UPLOAD_PARQUET", "200"]) - .observe(time); - Ok(()) + let result = self.client.put(&key.into(), bytes.into()).await; + increment_object_store_calls_by_date("gcs", "PUT", &Utc::now().date_naive().to_string()); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) + } + Err(err) => Err(err.into()), + } } async fn _upload_multipart( @@ -311,14 +322,46 @@ impl Gcs { let mut file = OpenOptions::new().read(true).open(path).await?; let location = &to_object_store_path(key); - let mut async_writer = self.client.put_multipart(location).await?; + // Track multipart initiation + let async_writer = self.client.put_multipart(location).await; + increment_object_store_calls_by_date( + "gcs", + "PUT_MULTIPART", + &Utc::now().date_naive().to_string(), + ); + let mut async_writer = match async_writer { + Ok(writer) => writer, + Err(err) => { + return Err(err.into()); + } + }; let meta = file.metadata().await?; let total_size = meta.len() as usize; if total_size < MIN_MULTIPART_UPLOAD_SIZE { let mut data = Vec::new(); file.read_to_end(&mut data).await?; - self.client.put(location, data.into()).await?; + + // Track single PUT operation for small files + let result = self.client.put(location, data.into()).await; + increment_object_store_calls_by_date( + "gcs", + "PUT", + &Utc::now().date_naive().to_string(), + ); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + } + Err(err) => { + return Err(err.into()); + } + } return Ok(()); } else { let mut data = Vec::new(); @@ -328,7 +371,7 @@ impl Gcs { let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE; let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; - // Upload each part + // Upload each part with metrics for part_number in 0..(total_parts) { let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE; let end_pos = if part_number == num_full_parts && has_final_partial_part { @@ -342,10 +385,21 @@ impl Gcs { // Extract this part's data let part_data = data[start_pos..end_pos].to_vec(); - // Upload the part - async_writer.put_part(part_data.into()).await?; + // Track individual part upload + let result = async_writer.put_part(part_data.into()).await; + increment_object_store_calls_by_date( + "gcs", + "PUT_MULTIPART", + &Utc::now().date_naive().to_string(), + ); + if result.is_err() { + return Err(result.err().unwrap().into()); + } } - if let Err(err) = async_writer.complete().await { + + // Track multipart completion + let complete_result = async_writer.complete().await; + if let Err(err) = complete_result { if let Err(abort_err) = async_writer.abort().await { error!( "Failed to abort multipart upload after completion failure: {:?}", @@ -353,7 +407,7 @@ impl Gcs { ); } return Err(err.into()); - }; + } } Ok(()) } @@ -366,12 +420,29 @@ impl ObjectStorage for Gcs { path: &RelativePath, ) -> Result { let path = &to_object_store_path(path); - let meta = self.client.head(path).await?; + + let meta = self.client.head(path).await; + increment_object_store_calls_by_date("gcs", "HEAD", &Utc::now().date_naive().to_string()); + let meta = match meta { + Ok(meta) => { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + meta + } + Err(err) => { + return Err(err.into()); + } + }; let store: Arc = self.client.clone(); let buf = object_store::buffered::BufReader::new(store, &meta); Ok(buf) } + async fn upload_multipart( &self, key: &RelativePath, @@ -379,8 +450,20 @@ impl ObjectStorage for Gcs { ) -> Result<(), ObjectStorageError> { self._upload_multipart(key, path).await } + async fn head(&self, path: &RelativePath) -> Result { - Ok(self.client.head(&to_object_store_path(path)).await?) + let result = self.client.head(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("gcs", "HEAD", &Utc::now().date_naive().to_string()); + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn get_object(&self, path: &RelativePath) -> Result { @@ -392,8 +475,6 @@ impl ObjectStorage for Gcs { base_path: Option<&RelativePath>, filter_func: Box bool + Send>, ) -> Result, ObjectStorageError> { - let instant = Instant::now(); - let prefix = if let Some(base_path) = base_path { to_object_store_path(base_path) } else { @@ -403,8 +484,18 @@ impl ObjectStorage for Gcs { let mut list_stream = self.client.list(Some(&prefix)); let mut res = vec![]; + let mut files_scanned = 0; + + // Note: We track each streaming list item retrieval + while let Some(meta_result) = list_stream.next().await { + let meta = match meta_result { + Ok(meta) => meta, + Err(err) => { + return Err(err.into()); + } + }; - while let Some(meta) = list_stream.next().await.transpose()? { + files_scanned += 1; let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); if !ingestor_file { @@ -417,38 +508,62 @@ impl ObjectStorage for Gcs { .map_err(ObjectStorageError::PathError)?, ) .await?; - + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "gcs", + "GET", + &Utc::now().date_naive().to_string(), + ); res.push(byts); } - let instant = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(instant); - + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + files_scanned as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); Ok(res) } async fn get_ingestor_meta_file_paths( &self, ) -> Result, ObjectStorageError> { - let time = Instant::now(); let mut path_arr = vec![]; + let mut files_scanned = 0; + let mut object_stream = self.client.list(Some(&self.root)); + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + + while let Some(meta_result) = object_stream.next().await { + let meta = match meta_result { + Ok(meta) => meta, + Err(err) => { + return Err(err.into()); + } + }; - while let Some(meta) = object_stream.next().await.transpose()? { + files_scanned += 1; let flag = meta.location.filename().unwrap().starts_with("ingestor"); if flag { path_arr.push(RelativePathBuf::from(meta.location.as_ref())); } } - - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(time); - + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + files_scanned as u64, + &Utc::now().date_naive().to_string(), + ); Ok(path_arr) } @@ -471,15 +586,37 @@ impl ObjectStorage for Gcs { } async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { - Ok(self.client.delete(&to_object_store_path(path)).await?) + let result = self.client.delete(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("gcs", "DELETE", &Utc::now().date_naive().to_string()); + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "DELETE", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn check(&self) -> Result<(), ObjectStorageError> { - Ok(self + let result = self .client .head(&to_object_store_path(&parseable_json_path())) - .await - .map(|_| ())?) + .await; + increment_object_store_calls_by_date("gcs", "HEAD", &Utc::now().date_naive().to_string()); + + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result.map(|_| ())?) } async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { @@ -490,19 +627,20 @@ impl ObjectStorage for Gcs { async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { let file = RelativePathBuf::from(&node_filename); - match self.client.delete(&to_object_store_path(&file)).await { - Ok(_) => Ok(()), - Err(err) => { - // if the object is not found, it is not an error - // the given url path was incorrect - if matches!(err, object_store::Error::NotFound { .. }) { - error!("Node does not exist"); - Err(err.into()) - } else { - error!("Error deleting node meta file: {:?}", err); - Err(err.into()) - } + + let result = self.client.delete(&to_object_store_path(&file)).await; + increment_object_store_calls_by_date("gcs", "DELETE", &Utc::now().date_naive().to_string()); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "DELETE", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) } + Err(err) => Err(err.into()), } } @@ -515,9 +653,14 @@ impl ObjectStorage for Gcs { async fn list_old_streams(&self) -> Result, ObjectStorageError> { let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes; // get all dirs - + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); // return prefixes at the root level let dirs: HashSet<_> = common_prefixes .iter() @@ -530,10 +673,23 @@ impl ObjectStorage for Gcs { for dir in &dirs { let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); - let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; + let task = async move { + let result = self.client.head(&StorePath::from(key)).await; + increment_object_store_calls_by_date( + "gcs", + "HEAD", + &Utc::now().date_naive().to_string(), + ); + result.map(|_| ()) + }; stream_json_check.push(task); } - + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "HEAD", + dirs.len() as u64, + &Utc::now().date_naive().to_string(), + ); stream_json_check.try_collect::<()>().await?; Ok(dirs) @@ -552,8 +708,15 @@ impl ObjectStorage for Gcs { ) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; - - let hours = resp + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + + let hours: Vec = resp .common_prefixes .iter() .filter_map(|path| { @@ -581,8 +744,14 @@ impl ObjectStorage for Gcs { ) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; - - let minutes = resp + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + let minutes: Vec = resp .common_prefixes .iter() .filter_map(|path| { @@ -604,9 +773,7 @@ impl ObjectStorage for Gcs { } async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - self._upload_file(key, path).await?; - - Ok(()) + Ok(self._upload_file(key, path).await?) } fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { @@ -629,7 +796,24 @@ impl ObjectStorage for Gcs { async fn list_dirs(&self) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from("/"); - let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let resp = self.client.list_with_delimiter(Some(&pre)).await; + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + let resp = match resp { + Ok(resp) => { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + + resp + } + Err(err) => { + return Err(err.into()); + } + }; Ok(resp .common_prefixes @@ -644,7 +828,24 @@ impl ObjectStorage for Gcs { relative_path: &RelativePath, ) -> Result, ObjectStorageError> { let prefix = object_store::path::Path::from(relative_path.as_str()); - let resp = self.client.list_with_delimiter(Some(&prefix)).await?; + + let resp = self.client.list_with_delimiter(Some(&prefix)).await; + increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + let resp = match resp { + Ok(resp) => { + increment_files_scanned_in_object_store_calls_by_date( + "gcs", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + + resp + } + Err(err) => { + return Err(err.into()); + } + }; Ok(resp .common_prefixes diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 8157c2b41..2271cbf80 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -20,11 +20,11 @@ use std::{ collections::HashSet, path::{Path, PathBuf}, sync::Arc, - time::Instant, }; use async_trait::async_trait; use bytes::Bytes; +use chrono::Utc; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; use futures::{TryStreamExt, stream::FuturesUnordered}; @@ -38,7 +38,9 @@ use tokio_stream::wrappers::ReadDirStream; use crate::{ handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{StorageMetrics, azureblob::REQUEST_RESPONSE_TIME}, + metrics::{ + increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + }, option::validation, parseable::LogStream, storage::SETTINGS_ROOT_DIRECTORY, @@ -84,10 +86,6 @@ impl ObjectStorageProvider for FSConfig { fn get_endpoint(&self) -> String { self.root.to_str().unwrap().to_string() } - - fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - self.register_metrics(handler); - } } #[derive(Debug)] @@ -138,8 +136,6 @@ impl ObjectStorage for LocalFS { ))) } async fn get_object(&self, path: &RelativePath) -> Result { - let time = Instant::now(); - let file_path; // this is for the `get_manifest()` function because inside a snapshot, we store the absolute path (without `/`) on linux based OS @@ -163,33 +159,51 @@ impl ObjectStorage for LocalFS { }; } - let res: Result = match fs::read(file_path).await { - Ok(x) => Ok(x.into()), - Err(e) => match e.kind() { - std::io::ErrorKind::NotFound => { + let file_result = fs::read(file_path).await; + let res: Result = match file_result { + Ok(x) => { + // Record single file accessed successfully + increment_files_scanned_in_object_store_calls_by_date( + "localfs", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "localfs", + "GET", + &Utc::now().date_naive().to_string(), + ); + Ok(x.into()) + } + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { Err(ObjectStorageError::NoSuchKey(path.to_string())) + } else { + Err(ObjectStorageError::UnhandledError(Box::new(e))) } - _ => Err(ObjectStorageError::UnhandledError(Box::new(e))), - }, + } }; - let status = if res.is_ok() { "200" } else { "400" }; - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", status]) - .observe(time); res } async fn get_ingestor_meta_file_paths( &self, ) -> Result, ObjectStorageError> { - let time = Instant::now(); - let mut path_arr = vec![]; - let mut entries = fs::read_dir(&self.root).await?; + let mut files_scanned = 0u64; + + let entries_result = fs::read_dir(&self.root).await; + let mut entries = match entries_result { + Ok(entries) => entries, + Err(err) => { + return Err(err.into()); + } + }; while let Some(entry) = entries.next_entry().await? { + files_scanned += 1; let flag = entry .path() .file_name() @@ -206,11 +220,18 @@ impl ObjectStorage for LocalFS { } } - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) // this might not be the right status code - .observe(time); - + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "localfs", + "LIST", + files_scanned, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "localfs", + "LIST", + &Utc::now().date_naive().to_string(), + ); Ok(path_arr) } @@ -220,16 +241,22 @@ impl ObjectStorage for LocalFS { base_path: Option<&RelativePath>, filter_func: Box bool + std::marker::Send + 'static>, ) -> Result, ObjectStorageError> { - let time = Instant::now(); - let prefix = if let Some(path) = base_path { path.to_path(&self.root) } else { self.root.clone() }; - let mut entries = fs::read_dir(&prefix).await?; + let entries_result = fs::read_dir(&prefix).await; + let mut entries = match entries_result { + Ok(entries) => entries, + Err(err) => { + return Err(err.into()); + } + }; + let mut res = Vec::new(); + let mut files_scanned = 0; while let Some(entry) = entries.next_entry().await? { let path = entry .path() @@ -240,22 +267,48 @@ impl ObjectStorage for LocalFS { .to_str() .expect("file name is parseable to str") .to_owned(); + + files_scanned += 1; let ingestor_file = filter_func(path); if !ingestor_file { continue; } - let file = fs::read(entry.path()).await?; - res.push(file.into()); + let file_result = fs::read(entry.path()).await; + match file_result { + Ok(file) => { + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "localfs", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "localfs", + "GET", + &Utc::now().date_naive().to_string(), + ); + res.push(file.into()); + } + Err(err) => { + return Err(err.into()); + } + } } - // maybe change the return code - let status = if res.is_empty() { "200" } else { "400" }; - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", status]) - .observe(time); + increment_files_scanned_in_object_store_calls_by_date( + "localfs", + "LIST", + files_scanned as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "localfs", + "LIST", + &Utc::now().date_naive().to_string(), + ); Ok(res) } @@ -265,49 +318,109 @@ impl ObjectStorage for LocalFS { path: &RelativePath, resource: Bytes, ) -> Result<(), ObjectStorageError> { - let time = Instant::now(); - let path = self.path_in_root(path); if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } - let res = fs::write(path, resource).await; - let status = if res.is_ok() { "200" } else { "400" }; - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["PUT", status]) - .observe(time); + let res = fs::write(path, resource).await; + if res.is_ok() { + // Record single file written successfully + increment_files_scanned_in_object_store_calls_by_date( + "localfs", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "localfs", + "PUT", + &Utc::now().date_naive().to_string(), + ); + } res.map_err(Into::into) } async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { let path = self.path_in_root(path); - tokio::fs::remove_dir_all(path).await?; + + let result = tokio::fs::remove_dir_all(path).await; + if result.is_ok() { + increment_object_store_calls_by_date( + "localfs", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + } + result?; Ok(()) } async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { let path = self.path_in_root(path); - tokio::fs::remove_file(path).await?; + + let result = tokio::fs::remove_file(path).await; + if result.is_ok() { + // Record single file deleted successfully + increment_files_scanned_in_object_store_calls_by_date( + "localfs", + "DELETE", + 1, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date( + "localfs", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + } + + result?; Ok(()) } async fn check(&self) -> Result<(), ObjectStorageError> { - fs::create_dir_all(&self.root) - .await - .map_err(|e| ObjectStorageError::UnhandledError(e.into())) + let result = fs::create_dir_all(&self.root).await; + if result.is_ok() { + increment_object_store_calls_by_date( + "localfs", + "HEAD", + &Utc::now().date_naive().to_string(), + ); + } + + result.map_err(|e| ObjectStorageError::UnhandledError(e.into())) } async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { let path = self.root.join(stream_name); - Ok(fs::remove_dir_all(path).await?) + + let result = fs::remove_dir_all(path).await; + if result.is_ok() { + increment_object_store_calls_by_date( + "localfs", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { let path = self.root.join(node_filename); - Ok(fs::remove_file(path).await?) + + let result = fs::remove_file(path).await; + if result.is_ok() { + increment_object_store_calls_by_date( + "localfs", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn list_streams(&self) -> Result, ObjectStorageError> { @@ -318,7 +431,22 @@ impl ObjectStorage for LocalFS { ALERTS_ROOT_DIRECTORY, SETTINGS_ROOT_DIRECTORY, ]; - let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); + + let result = fs::read_dir(&self.root).await; + let directories = match result { + Ok(read_dir) => { + increment_object_store_calls_by_date( + "localfs", + "LIST", + &Utc::now().date_naive().to_string(), + ); + ReadDirStream::new(read_dir) + } + Err(err) => { + return Err(err.into()); + } + }; + let entries: Vec = directories.try_collect().await?; let entries = entries .into_iter() @@ -339,7 +467,22 @@ impl ObjectStorage for LocalFS { ALERTS_ROOT_DIRECTORY, SETTINGS_ROOT_DIRECTORY, ]; - let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); + + let result = fs::read_dir(&self.root).await; + let directories = match result { + Ok(read_dir) => { + increment_object_store_calls_by_date( + "localfs", + "LIST", + &Utc::now().date_naive().to_string(), + ); + ReadDirStream::new(read_dir) + } + Err(err) => { + return Err(err.into()); + } + }; + let entries: Vec = directories.try_collect().await?; let entries = entries .into_iter() @@ -354,7 +497,22 @@ impl ObjectStorage for LocalFS { } async fn list_dirs(&self) -> Result, ObjectStorageError> { - let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?) + let result = fs::read_dir(&self.root).await; + let read_dir = match result { + Ok(read_dir) => { + increment_object_store_calls_by_date( + "localfs", + "LIST", + &Utc::now().date_naive().to_string(), + ); + read_dir + } + Err(err) => { + return Err(err.into()); + } + }; + + let dirs = ReadDirStream::new(read_dir) .try_collect::>() .await? .into_iter() @@ -375,7 +533,16 @@ impl ObjectStorage for LocalFS { relative_path: &RelativePath, ) -> Result, ObjectStorageError> { let root = self.root.join(relative_path.as_str()); - let dirs = ReadDirStream::new(fs::read_dir(root).await?) + + let result = fs::read_dir(root).await; + let read_dir = match result { + Ok(read_dir) => read_dir, + Err(err) => { + return Err(err.into()); + } + }; + + let dirs = ReadDirStream::new(read_dir) .try_collect::>() .await? .into_iter() @@ -393,7 +560,23 @@ impl ObjectStorage for LocalFS { async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { let path = self.root.join(stream_name); - let directories = ReadDirStream::new(fs::read_dir(&path).await?); + + let result = fs::read_dir(&path).await; + let read_dir = match result { + Ok(read_dir) => { + increment_object_store_calls_by_date( + "localfs", + "LIST", + &Utc::now().date_naive().to_string(), + ); + read_dir + } + Err(err) => { + return Err(err.into()); + } + }; + + let directories = ReadDirStream::new(read_dir); let entries: Vec = directories.try_collect().await?; let entries = entries.into_iter().map(dir_name); let dates: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?; @@ -448,8 +631,19 @@ impl ObjectStorage for LocalFS { if let Some(path) = to_path.parent() { fs::create_dir_all(path).await?; } - let _ = fs_extra::file::copy(path, to_path, &op)?; - Ok(()) + + let result = fs_extra::file::copy(path, to_path, &op); + match result { + Ok(_) => { + increment_object_store_calls_by_date( + "localfs", + "PUT", + &Utc::now().date_naive().to_string(), + ); + Ok(()) + } + Err(err) => Err(err.into()), + } } fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { diff --git a/src/storage/metrics_layer.rs b/src/storage/metrics_layer.rs index cfaaeb6d2..945ee6ba8 100644 --- a/src/storage/metrics_layer.rs +++ b/src/storage/metrics_layer.rs @@ -30,20 +30,49 @@ use object_store::{ PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, path::Path, }; -/* NOTE: Keeping these imports as they would make migration to object_store 0.10.0 easier -use object_store::{MultipartUpload, PutMultipartOpts, PutPayload} -*/ +use crate::metrics::STORAGE_REQUEST_RESPONSE_TIME; -use crate::metrics::storage::s3::QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME; +// Public helper function to map object_store errors to HTTP status codes +pub fn error_to_status_code(err: &object_store::Error) -> &'static str { + match err { + // 400 Bad Request - Client errors + object_store::Error::Generic { .. } => "400", + + // 401 Unauthorized - Authentication required + object_store::Error::Unauthenticated { .. } => "401", + + // 404 Not Found - Resource doesn't exist + object_store::Error::NotFound { .. } => "404", + + // 409 Conflict - Resource already exists + object_store::Error::AlreadyExists { .. } => "409", + + // 412 Precondition Failed - If-Match, If-None-Match, etc. failed + object_store::Error::Precondition { .. } => "412", + + // 304 Not Modified + object_store::Error::NotModified { .. } => "304", + + // 501 Not Implemented - Feature not supported + object_store::Error::NotSupported { .. } => "501", + + // 500 Internal Server Error - All other errors + _ => "500", + } +} #[derive(Debug)] pub struct MetricLayer { inner: T, + provider: String, } impl MetricLayer { - pub fn new(inner: T) -> Self { - Self { inner } + pub fn new(inner: T, provider: &str) -> Self { + Self { + inner, + provider: provider.to_string(), + } } } @@ -62,12 +91,18 @@ impl ObjectStore for MetricLayer { bytes: PutPayload, /* PutPayload */ ) -> ObjectStoreResult { let time = time::Instant::now(); - let put_result = self.inner.put(location, bytes).await?; + let put_result = self.inner.put(location, bytes).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["PUT", "200"]) + + let status = match &put_result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "PUT", status]) .observe(elapsed); - return Ok(put_result); + put_result } async fn put_opts( @@ -77,12 +112,18 @@ impl ObjectStore for MetricLayer { opts: PutOptions, ) -> ObjectStoreResult { let time = time::Instant::now(); - let put_result = self.inner.put_opts(location, payload, opts).await?; + let put_result = self.inner.put_opts(location, payload, opts).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["PUT_OPTS", "200"]) + + let status = match &put_result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "PUT_OPTS", status]) .observe(elapsed); - return Ok(put_result); + put_result } // // ! removed in object_store 0.10.0 @@ -94,7 +135,7 @@ impl ObjectStore for MetricLayer { // let time = time::Instant::now(); // let elapsed = time.elapsed().as_secs_f64(); // self.inner.abort_multipart(location, multipart_id).await?; - // QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + // STORAGE_REQUEST_RESPONSE_TIME // .with_label_values(&["PUT_MULTIPART_ABORT", "200"]) // .observe(elapsed); // Ok(()) @@ -107,56 +148,84 @@ impl ObjectStore for MetricLayer { opts: PutMultipartOpts, ) -> ObjectStoreResult> { let time = time::Instant::now(); - let multipart_upload = self.inner.put_multipart_opts(location, opts).await?; + let result = self.inner.put_multipart_opts(location, opts).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["PUT_MULTIPART_OPTS", "200"]) - .observe(elapsed); - Ok(multipart_upload) + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "PUT_MULTIPART_OPTS", status]) + .observe(elapsed); + result } // todo completly tracking multipart upload async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> /* ObjectStoreResult<(MultipartId, Box)> */ { let time = time::Instant::now(); - let multipart_upload = self.inner.put_multipart(location).await?; + let result = self.inner.put_multipart(location).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["PUT_MULTIPART", "200"]) - .observe(elapsed); - Ok(multipart_upload) + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "PUT_MULTIPART", status]) + .observe(elapsed); + result } async fn get(&self, location: &Path) -> ObjectStoreResult { let time = time::Instant::now(); - let res = self.inner.get(location).await?; + let get_result = self.inner.get(location).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) + + let status = match &get_result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "GET", status]) .observe(elapsed); - Ok(res) + get_result } async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { let time = time::Instant::now(); - let res = self.inner.get_opts(location, options).await?; + let result = self.inner.get_opts(location, options).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["GET_OPTS", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "GET_OPTS", status]) .observe(elapsed); - Ok(res) + result } async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { let time = time::Instant::now(); - let res = self.inner.get_range(location, range).await?; + let result = self.inner.get_range(location, range).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["GET_RANGE", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "GET_RANGE", status]) .observe(elapsed); - Ok(res) + result } async fn get_ranges( @@ -165,32 +234,50 @@ impl ObjectStore for MetricLayer { ranges: &[Range], ) -> ObjectStoreResult> { let time = time::Instant::now(); - let res = self.inner.get_ranges(location, ranges).await?; + let result = self.inner.get_ranges(location, ranges).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["GET_RANGES", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "GET_RANGES", status]) .observe(elapsed); - Ok(res) + result } async fn head(&self, location: &Path) -> ObjectStoreResult { let time = time::Instant::now(); - let res = self.inner.head(location).await?; + let result = self.inner.head(location).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["HEAD", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "HEAD", status]) .observe(elapsed); - Ok(res) + result } async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { let time = time::Instant::now(); - let res = self.inner.delete(location).await?; + let result = self.inner.delete(location).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["DELETE", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "DELETE", status]) .observe(elapsed); - Ok(res) + result } fn delete_stream<'a>( @@ -229,52 +316,82 @@ impl ObjectStore for MetricLayer { async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { let time = time::Instant::now(); - let res = self.inner.list_with_delimiter(prefix).await?; + let result = self.inner.list_with_delimiter(prefix).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["LIST_DELIM", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "LIST_DELIM", status]) .observe(elapsed); - Ok(res) + result } async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { let time = time::Instant::now(); - let res = self.inner.copy(from, to).await?; + let result = self.inner.copy(from, to).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["COPY", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "COPY", status]) .observe(elapsed); - Ok(res) + result } async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { let time = time::Instant::now(); - let res = self.inner.rename(from, to).await?; + let result = self.inner.rename(from, to).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["RENAME", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "RENAME", status]) .observe(elapsed); - Ok(res) + result } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { let time = time::Instant::now(); - let res = self.inner.copy_if_not_exists(from, to).await?; + let result = self.inner.copy_if_not_exists(from, to).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["COPY_IF", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "COPY_IF", status]) .observe(elapsed); - Ok(res) + result } async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { let time = time::Instant::now(); - let res = self.inner.rename_if_not_exists(from, to).await?; + let result = self.inner.rename_if_not_exists(from, to).await; let elapsed = time.elapsed().as_secs_f64(); - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["RENAME_IF", "200"]) + + let status = match &result { + Ok(_) => "200", + Err(err) => error_to_status_code(err), + }; + + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, "RENAME_IF", status]) .observe(elapsed); - Ok(res) + result } } @@ -293,7 +410,7 @@ impl Stream for StreamMetricWrapper<'_, N, T> { ) -> Poll> { match self.inner.poll_next_unpin(cx) { t @ Poll::Ready(None) => { - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&self.labels) .observe(self.time.elapsed().as_secs_f64()); t diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 41639d025..225244bb9 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,7 +16,6 @@ * */ -use actix_web_prometheus::PrometheusMetrics; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; @@ -50,7 +49,8 @@ use crate::handlers::http::fetch_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR}; -use crate::metrics::storage::StorageMetrics; +use crate::metrics::increment_parquets_stored_by_date; +use crate::metrics::increment_parquets_stored_size_by_date; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; use crate::option::Mode; use crate::parseable::{LogStream, PARSEABLE, Stream}; @@ -169,18 +169,24 @@ fn update_storage_metrics( ) -> Result<(), ObjectStorageError> { let mut file_date_part = filename.split('.').collect::>()[0]; file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = path.metadata().map_or(0, |meta| meta.len()); - + let compressed_size = path + .metadata() + .map(|m| m.len()) + .map_err(|e| ObjectStorageError::Custom(format!("metadata failed for {filename}: {e}")))?; STORAGE_SIZE .with_label_values(&["data", stream_name, "parquet"]) .add(compressed_size as i64); EVENTS_STORAGE_SIZE_DATE .with_label_values(&["data", stream_name, "parquet", file_date_part]) - .add(compressed_size as i64); + .inc_by(compressed_size); LIFETIME_EVENTS_STORAGE_SIZE .with_label_values(&["data", stream_name, "parquet"]) .add(compressed_size as i64); + // billing metrics for parquet storage + increment_parquets_stored_by_date(file_date_part); + increment_parquets_stored_size_by_date(compressed_size, file_date_part); + Ok(()) } @@ -242,7 +248,7 @@ async fn validate_uploaded_parquet_file( } } -pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { +pub trait ObjectStorageProvider: std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder; fn construct_client(&self) -> Arc; fn get_object_store(&self) -> Arc { @@ -251,7 +257,6 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync STORE.get_or_init(|| self.construct_client()).clone() } fn get_endpoint(&self) -> String; - fn register_store_metrics(&self, handler: &PrometheusMetrics); fn name(&self) -> &'static str; } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index bc8d57a63..853fef8b7 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -21,12 +21,16 @@ use std::{ fmt::Display, path::Path, str::FromStr, - sync::Arc, - time::{Duration, Instant}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, }; use async_trait::async_trait; use bytes::Bytes; +use chrono::Utc; use datafusion::{ datasource::listing::ListingTableUrl, execution::{ @@ -44,10 +48,12 @@ use object_store::{ }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; -use tracing::{error, info}; +use tracing::error; use crate::{ - metrics::storage::{StorageMetrics, azureblob::REQUEST_RESPONSE_TIME}, + metrics::{ + increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + }, parseable::LogStream, }; @@ -299,7 +305,7 @@ impl ObjectStorageProvider for S3Config { // limit objectstore to a concurrent request limit let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); - let s3 = MetricLayer::new(s3); + let s3 = MetricLayer::new(s3, "s3"); let object_store_registry = DefaultObjectStoreRegistry::new(); let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); @@ -321,10 +327,6 @@ impl ObjectStorageProvider for S3Config { fn get_endpoint(&self) -> String { format!("{}/{}", self.endpoint_url, self.bucket_name) } - - fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - self.register_metrics(handler) - } } #[derive(Debug)] @@ -336,26 +338,21 @@ pub struct S3 { impl S3 { async fn _get_object(&self, path: &RelativePath) -> Result { - let instant = Instant::now(); - let resp = self.client.get(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("s3", "GET", &Utc::now().date_naive().to_string()); match resp { Ok(resp) => { - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(time); let body = resp.bytes().await?; + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); Ok(body) } - Err(err) => { - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "400"]) - .observe(time); - Err(err.into()) - } + Err(err) => Err(err.into()), } } @@ -364,55 +361,94 @@ impl S3 { path: &RelativePath, resource: PutPayload, ) -> Result<(), ObjectStorageError> { - let time = Instant::now(); let resp = self.client.put(&to_object_store_path(path), resource).await; - let status = if resp.is_ok() { "200" } else { "400" }; - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["PUT", status]) - .observe(time); - - if let Err(object_store::Error::NotFound { source, .. }) = &resp { - let source_str = source.to_string(); - if source_str.contains("NoSuchBucket") { - return Err(ObjectStorageError::Custom( - format!("Bucket '{}' does not exist in S3.", self.bucket).to_string(), - )); + increment_object_store_calls_by_date("s3", "PUT", &Utc::now().date_naive().to_string()); + match resp { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) } + Err(err) => Err(err.into()), } - - resp.map(|_| ()).map_err(|err| err.into()) } async fn _delete_prefix(&self, key: &str) -> Result<(), ObjectStorageError> { + let files_scanned = Arc::new(AtomicU64::new(0)); + let files_deleted = Arc::new(AtomicU64::new(0)); + // Track LIST operation let object_stream = self.client.list(Some(&(key.into()))); + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); object_stream .for_each_concurrent(None, |x| async { + files_scanned.fetch_add(1, Ordering::Relaxed); + match x { Ok(obj) => { - if (self.client.delete(&obj.location).await).is_err() { - error!("Failed to fetch object during delete stream"); + files_deleted.fetch_add(1, Ordering::Relaxed); + let delete_resp = self.client.delete(&obj.location).await; + increment_object_store_calls_by_date( + "s3", + "DELETE", + &Utc::now().date_naive().to_string(), + ); + if delete_resp.is_err() { + error!( + "Failed to delete object during delete stream: {:?}", + delete_resp + ); } } - Err(_) => { - error!("Failed to fetch object during delete stream"); + Err(err) => { + error!("Failed to fetch object during delete stream: {:?}", err); } }; }) .await; + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + files_scanned.load(Ordering::Relaxed), + &Utc::now().date_naive().to_string(), + ); + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "DELETE", + files_deleted.load(Ordering::Relaxed), + &Utc::now().date_naive().to_string(), + ); Ok(()) } async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { - let resp = self + let resp: Result = self .client .list_with_delimiter(Some(&(stream.into()))) - .await?; + .await; + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + + let resp = match resp { + Ok(resp) => resp, + Err(err) => { + return Err(err.into()); + } + }; let common_prefixes = resp.common_prefixes; + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + // return prefixes at the root level let dates: Vec<_> = common_prefixes .iter() @@ -423,66 +459,23 @@ impl S3 { Ok(dates) } - // async fn _list_manifest_files( - // &self, - // stream: &str, - // ) -> Result>, ObjectStorageError> { - // let mut result_file_list: BTreeMap> = BTreeMap::new(); - // let resp = self - // .client - // .list_with_delimiter(Some(&(stream.into()))) - // .await?; - // warn!(resp=?resp); - // let dates = resp - // .common_prefixes - // .iter() - // .flat_map(|path| path.parts()) - // .filter(|name| name.as_ref() != stream && name.as_ref() != STREAM_ROOT_DIRECTORY) - // .map(|name| name.as_ref().to_string()) - // .collect::>(); - // warn!(dates=?dates); - - // for date in dates { - // let date_path = object_store::path::Path::from(format!("{}/{}", stream, &date)); - // let resp = self.client.list_with_delimiter(Some(&date_path)).await?; - // warn!(date_path=?resp); - // let manifests: Vec = resp - // .objects - // .iter() - // .filter(|name| name.location.filename().unwrap().ends_with("manifest.json")) - // .map(|name| name.location.to_string()) - // .collect(); - // result_file_list.entry(date).or_default().extend(manifests); - // } - // Ok(result_file_list) - // } - async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - let instant = Instant::now(); - - // // TODO: Uncomment this when multipart is fixed - // let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; - - let should_multipart = false; - - let res = if should_multipart { - // self._upload_multipart(key, path).await - // this branch will never get executed - Ok(()) - } else { - let bytes = tokio::fs::read(path).await?; - let result = self.client.put(&key.into(), bytes.into()).await?; - info!("Uploaded file to S3: {:?}", result); - Ok(()) - }; - - let status = if res.is_ok() { "200" } else { "400" }; - let time = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["UPLOAD_PARQUET", status]) - .observe(time); - - res + let bytes = tokio::fs::read(path).await?; + + let result = self.client.put(&key.into(), bytes.into()).await; + increment_object_store_calls_by_date("s3", "PUT", &Utc::now().date_naive().to_string()); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) + } + Err(err) => Err(err.into()), + } } async fn _upload_multipart( @@ -493,14 +486,43 @@ impl S3 { let mut file = OpenOptions::new().read(true).open(path).await?; let location = &to_object_store_path(key); - let mut async_writer = self.client.put_multipart(location).await?; + // Track multipart initiation + let async_writer = self.client.put_multipart(location).await; + increment_object_store_calls_by_date( + "s3", + "PUT_MULTIPART", + &Utc::now().date_naive().to_string(), + ); + let mut async_writer = match async_writer { + Ok(writer) => writer, + Err(err) => { + return Err(err.into()); + } + }; let meta = file.metadata().await?; let total_size = meta.len() as usize; if total_size < MIN_MULTIPART_UPLOAD_SIZE { let mut data = Vec::new(); file.read_to_end(&mut data).await?; - self.client.put(location, data.into()).await?; + + // Track single PUT operation for small files + let result = self.client.put(location, data.into()).await; + increment_object_store_calls_by_date("s3", "PUT", &Utc::now().date_naive().to_string()); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "PUT", + 1, + &Utc::now().date_naive().to_string(), + ); + } + Err(err) => { + return Err(err.into()); + } + } + // async_writer.put_part(data.into()).await?; // async_writer.complete().await?; return Ok(()); @@ -514,7 +536,7 @@ impl S3 { let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE; let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; - // Upload each part + // Upload each part with metrics for part_number in 0..(total_parts) { let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE; let end_pos = if part_number == num_full_parts && has_final_partial_part { @@ -528,15 +550,27 @@ impl S3 { // Extract this part's data let part_data = data[start_pos..end_pos].to_vec(); - // Upload the part - async_writer.put_part(part_data.into()).await?; + // Track individual part upload + let result = async_writer.put_part(part_data.into()).await; + increment_object_store_calls_by_date( + "s3", + "PUT_MULTIPART", + &Utc::now().date_naive().to_string(), + ); + if result.is_err() { + return Err(result.err().unwrap().into()); + } // upload_parts.push(part_number as u64 + 1); } - if let Err(err) = async_writer.complete().await { + + // Track multipart completion + let complete_result = async_writer.complete().await; + if let Err(err) = complete_result { error!("Failed to complete multipart upload. {:?}", err); async_writer.abort().await?; - }; + return Err(err.into()); + } } Ok(()) } @@ -549,12 +583,28 @@ impl ObjectStorage for S3 { path: &RelativePath, ) -> Result { let path = &to_object_store_path(path); - let meta = self.client.head(path).await?; + let meta = self.client.head(path).await; + increment_object_store_calls_by_date("s3", "HEAD", &Utc::now().date_naive().to_string()); + let meta = match meta { + Ok(meta) => { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + meta + } + Err(err) => { + return Err(err.into()); + } + }; let store: Arc = Arc::new(self.client.clone()); let buf = object_store::buffered::BufReader::new(store, &meta); Ok(buf) } + async fn upload_multipart( &self, key: &RelativePath, @@ -562,8 +612,20 @@ impl ObjectStorage for S3 { ) -> Result<(), ObjectStorageError> { self._upload_multipart(key, path).await } + async fn head(&self, path: &RelativePath) -> Result { - Ok(self.client.head(&to_object_store_path(path)).await?) + let result = self.client.head(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("s3", "HEAD", &Utc::now().date_naive().to_string()); + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn get_object(&self, path: &RelativePath) -> Result { @@ -575,8 +637,6 @@ impl ObjectStorage for S3 { base_path: Option<&RelativePath>, filter_func: Box bool + Send>, ) -> Result, ObjectStorageError> { - let instant = Instant::now(); - let prefix = if let Some(base_path) = base_path { to_object_store_path(base_path) } else { @@ -586,8 +646,18 @@ impl ObjectStorage for S3 { let mut list_stream = self.client.list(Some(&prefix)); let mut res = vec![]; + let mut files_scanned = 0; + + // Note: We track each streaming list item retrieval + while let Some(meta_result) = list_stream.next().await { + let meta = match meta_result { + Ok(meta) => meta, + Err(err) => { + return Err(err.into()); + } + }; - while let Some(meta) = list_stream.next().await.transpose()? { + files_scanned += 1; let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); if !ingestor_file { @@ -600,38 +670,57 @@ impl ObjectStorage for S3 { .map_err(ObjectStorageError::PathError)?, ) .await?; - + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "GET", + 1, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("s3", "GET", &Utc::now().date_naive().to_string()); res.push(byts); } - - let instant = instant.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(instant); - + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + files_scanned as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); Ok(res) } async fn get_ingestor_meta_file_paths( &self, ) -> Result, ObjectStorageError> { - let time = Instant::now(); let mut path_arr = vec![]; + let mut files_scanned = 0; + let mut object_stream = self.client.list(Some(&self.root)); + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + + while let Some(meta_result) = object_stream.next().await { + let meta = match meta_result { + Ok(meta) => meta, + Err(err) => { + return Err(err.into()); + } + }; - while let Some(meta) = object_stream.next().await.transpose()? { + files_scanned += 1; let flag = meta.location.filename().unwrap().starts_with("ingestor"); if flag { path_arr.push(RelativePathBuf::from(meta.location.as_ref())); } } - - let time = time.elapsed().as_secs_f64(); - REQUEST_RESPONSE_TIME - .with_label_values(&["GET", "200"]) - .observe(time); - + // Record total files scanned + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + files_scanned as u64, + &Utc::now().date_naive().to_string(), + ); Ok(path_arr) } @@ -654,15 +743,37 @@ impl ObjectStorage for S3 { } async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { - Ok(self.client.delete(&to_object_store_path(path)).await?) + let result = self.client.delete(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("s3", "DELETE", &Utc::now().date_naive().to_string()); + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "DELETE", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result?) } async fn check(&self) -> Result<(), ObjectStorageError> { - Ok(self + let result = self .client .head(&to_object_store_path(&parseable_json_path())) - .await - .map(|_| ())?) + .await; + increment_object_store_calls_by_date("s3", "HEAD", &Utc::now().date_naive().to_string()); + + if result.is_ok() { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "HEAD", + 1, + &Utc::now().date_naive().to_string(), + ); + } + + Ok(result.map(|_| ())?) } async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { @@ -673,19 +784,20 @@ impl ObjectStorage for S3 { async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { let file = RelativePathBuf::from(&node_filename); - match self.client.delete(&to_object_store_path(&file)).await { - Ok(_) => Ok(()), - Err(err) => { - // if the object is not found, it is not an error - // the given url path was incorrect - if matches!(err, object_store::Error::NotFound { .. }) { - error!("Node does not exist"); - Err(err.into()) - } else { - error!("Error deleting node meta file: {:?}", err); - Err(err.into()) - } + + let result = self.client.delete(&to_object_store_path(&file)).await; + increment_object_store_calls_by_date("s3", "DELETE", &Utc::now().date_naive().to_string()); + match result { + Ok(_) => { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "DELETE", + 1, + &Utc::now().date_naive().to_string(), + ); + Ok(()) } + Err(err) => Err(err.into()), } } @@ -698,9 +810,14 @@ impl ObjectStorage for S3 { async fn list_old_streams(&self) -> Result, ObjectStorageError> { let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes; // get all dirs - + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); // return prefixes at the root level let dirs: HashSet<_> = common_prefixes .iter() @@ -713,10 +830,23 @@ impl ObjectStorage for S3 { for dir in &dirs { let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); - let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; + let task = async move { + let result = self.client.head(&StorePath::from(key)).await; + increment_object_store_calls_by_date( + "s3", + "HEAD", + &Utc::now().date_naive().to_string(), + ); + result.map(|_| ()) + }; stream_json_check.push(task); } - + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "HEAD", + dirs.len() as u64, + &Utc::now().date_naive().to_string(), + ); stream_json_check.try_collect::<()>().await?; Ok(dirs) @@ -735,6 +865,13 @@ impl ObjectStorage for S3 { ) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); let hours: Vec = resp .common_prefixes @@ -764,7 +901,13 @@ impl ObjectStorage for S3 { ) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; - + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); let minutes: Vec = resp .common_prefixes .iter() @@ -786,19 +929,8 @@ impl ObjectStorage for S3 { Ok(minutes) } - // async fn list_manifest_files( - // &self, - // stream_name: &str, - // ) -> Result>, ObjectStorageError> { - // let files = self._list_manifest_files(stream_name).await?; - - // Ok(files) - // } - async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - self._upload_file(key, path).await?; - - Ok(()) + Ok(self._upload_file(key, path).await?) } fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { @@ -821,7 +953,23 @@ impl ObjectStorage for S3 { async fn list_dirs(&self) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from("/"); - let resp = self.client.list_with_delimiter(Some(&pre)).await?; + let resp = self.client.list_with_delimiter(Some(&pre)).await; + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + let resp = match resp { + Ok(resp) => { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + + resp + } + Err(err) => { + return Err(err.into()); + } + }; Ok(resp .common_prefixes @@ -836,7 +984,24 @@ impl ObjectStorage for S3 { relative_path: &RelativePath, ) -> Result, ObjectStorageError> { let prefix = object_store::path::Path::from(relative_path.as_str()); - let resp = self.client.list_with_delimiter(Some(&prefix)).await?; + + let resp = self.client.list_with_delimiter(Some(&prefix)).await; + increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + let resp = match resp { + Ok(resp) => { + increment_files_scanned_in_object_store_calls_by_date( + "s3", + "LIST", + resp.common_prefixes.len() as u64, + &Utc::now().date_naive().to_string(), + ); + + resp + } + Err(err) => { + return Err(err.into()); + } + }; Ok(resp .common_prefixes