diff --git a/Cargo.lock b/Cargo.lock index 2071c3867..71df22a7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -754,9 +754,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6831e34a636bb7a72758aae4cdf83c2f2163a2a1cddc0c4c87fbc012e3abb439" +checksum = "075d87b46420b28b64140f2ba88fa6b158c2877466a2acdbeaf396c25e4b9b33" dependencies = [ "futures-util", "pin-project-lite", @@ -766,9 +766,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "596e886b87ac17cb182b01af6452f45ec27c49a199cfc1f2fb81136ace4141af" +checksum = "55fe82d7463becdd632f8c6446cbdb2cbe34ad42a7d92c480d8fca08749d07a4" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -787,9 +787,9 @@ dependencies = [ [[package]] name = "aws-smithy-client" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7365943b56f47c9217cc8e82ac102474b62d3efa24382ab3b7d31dee069a4b04" +checksum = "17d44078855a64d757e5c1727df29ffa6679022c38cfc4ba4e63ee9567133141" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -812,9 +812,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f5b51e2c7b64ddba062c157429d0c6b110fb7a3392362f9247a25aa8c12dfb5" +checksum = "652a99272024770cbe33579dc0016914a09922b27f9a4d12f37472aacbbe71c1" dependencies = [ "aws-smithy-types", "bytes", @@ -823,9 +823,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f662559a3f3e448b961fdbc306c7d22162187cc02d0abe6b517bab061af9e11e" +checksum = "b5bd86f48d7e36fb24ee922d04d79c8353e01724b1c38757ed92593179223aa7" dependencies = [ "aws-smithy-eventstream", "aws-smithy-types", @@ -846,9 +846,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-tower" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e46665291398cd939b2d8f962fa91f51aaf1db95709f0effe7a4313cfa6f2418" +checksum = "c8972d1b4ae3aba1a10e7106fed53a5a36bc8ef86170a84f6ddd33d36fac12ad" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -862,18 +862,18 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33123c0a5c876eede87c170c93cfc49ab8fd5619e3f0a1acae6047ba3f43c8c9" +checksum = "18973f12721e27b54891386497a57e1ba09975df1c6cfeccafaf541198962aef" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-protocol-test" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56d77f3aa93a53ac9ee0f290b75c050f3356f29166e6191908c7d3619c76d755" +checksum = "b72e9ac0818d0016ced540ba0d06975299d27684ff514173b21c9976fd72062b" dependencies = [ "assert-json-diff", "http", @@ -886,9 +886,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da22cf5a1529f60aeb4605311164ddb7d88bae03bf99314d9fb8e682d0a34ae5" +checksum = "da7e499c4b15bab8eb6b234df31833cc83a1bdaa691ba72d5d81efc109d9d705" dependencies = [ "base64-simd", "itoa 1.0.5", @@ -899,9 +899,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.54.2" +version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f57927651cde61bf181874694ea19b1c59f2bcb6e2efba4adaef222d570ec8d" +checksum = "9a73082f023f4a361fe811954da0061076709198792a3d2ad3a7498e10b606a0" dependencies = [ "xmlparser", ] @@ -1385,9 +1385,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc831ee6a32dd495436e317595e639a587aa9907bef96fe6e6abc290ab6204e9" +checksum = "90d59d9acd2a682b4e40605a242f6670eaa58c5957471cbf85e8aa6a0b97a5e8" dependencies = [ "cc", "cxxbridge-flags", @@ -1397,9 +1397,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94331d54f1b1a8895cd81049f7eaaaef9d05a7dcb4d1fd08bf3ff0806246789d" +checksum = "ebfa40bda659dd5c864e65f4c9a2b0aff19bea56b017b9b77c73d3766a453a38" dependencies = [ "cc", "codespan-reporting", @@ -1412,15 +1412,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48dcd35ba14ca9b40d6e4b4b39961f23d835dbb8eed74565ded361d93e1feb8a" +checksum = "457ce6757c5c70dc6ecdbda6925b958aae7f959bda7d8fb9bde889e34a09dc03" [[package]] name = "cxxbridge-macro" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2" +checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263" dependencies = [ "proc-macro2", "quote", @@ -1640,9 +1640,9 @@ dependencies = [ [[package]] name = "enum-iterator" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91a4ec26efacf4aeff80887a175a419493cb6f8b5480d26387eb0bd038976187" +checksum = "9ea166b3f7dc1032f7866d13f8d8e02c8d87507b61750176b86554964dc6a7bf" dependencies = [ "enum-iterator-derive", ] @@ -3446,9 +3446,9 @@ dependencies = [ [[package]] name = "signal-hook" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" +checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9" dependencies = [ "libc", "signal-hook-registry", @@ -3467,9 +3467,9 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" dependencies = [ "libc", ] @@ -3812,9 +3812,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6a3b08b64e6dfad376fa2432c7b1f01522e37a623c3050bc95db2d3ff21583" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ "bytes", "futures-core", diff --git a/server/Cargo.toml b/server/Cargo.toml index 74d3a3adb..7d5daae37 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,7 +11,7 @@ actix-web-httpauth = "0.8" actix-web = { version = "4.3", features = ["rustls"] } actix-cors = "0.6" actix-files = "0.6" -actix-web-prometheus = { version = "0.1", features = ["process"] } +actix-web-prometheus = { version = "0.1" } prometheus = { version = "0.13", features = ["process"] } anyhow = { version = "1.0", features = ["backtrace"] } arrow-schema = { version = "31.0", features = ["serde"] } @@ -40,7 +40,7 @@ futures = "0.3" fs_extra = "1.3" http = "0.2" humantime-serde = "1.1" -lazy_static = "1.4.0" +lazy_static = "1.4" log = "0.4" num_cpus = "1.15" md-5 = "0.10" @@ -67,9 +67,9 @@ actix-web-static-files = "4.0" static-files = "0.2" ulid = { version = "1.0", features = ["serde"] } ureq = { version = "2.6", features = ["json"] } -hex = "0.4.3" -itertools = "0.10.5" -xxhash-rust = { version = "0.8.6", features = ["xxh3"] } +hex = "0.4" +itertools = "0.10" +xxhash-rust = { version = "0.8", features = ["xxh3"] } [build-dependencies] static-files = "0.2" diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index caa8153d4..5f346d780 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -17,10 +17,12 @@ */ use serde::{Deserialize, Serialize}; +use std::fmt; pub mod rule; pub mod target; +use crate::metrics::ALERTS_STATES; use crate::utils::uid::Uid; pub use self::rule::Rule; @@ -51,6 +53,13 @@ impl Alert { AlertState::Listening | AlertState::Firing => (), alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => { let context = self.get_context(stream_name, alert_state); + ALERTS_STATES + .with_label_values(&[ + context.stream.as_str(), + context.alert_name.as_str(), + context.alert_state.to_string().as_str(), + ]) + .inc(); for target in &self.targets { target.call(context.clone()); } @@ -124,3 +133,14 @@ impl Default for AlertState { Self::Listening } } + +impl fmt::Display for AlertState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + AlertState::Listening => write!(f, "Listening"), + AlertState::SetToFiring => write!(f, "SetToFiring"), + AlertState::Firing => write!(f, "Firing"), + AlertState::Resolved => write!(f, "Resolved"), + } + } +} diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 6daf8dbd1..34b556f77 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -18,8 +18,10 @@ use actix_web::{web, HttpRequest, HttpResponse}; use serde_json::Value; +use std::time::Instant; use crate::event; +use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::CONFIG; use crate::query::Query; use crate::response::QueryResponse; @@ -34,17 +36,23 @@ const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const SEPARATOR: char = '^'; pub async fn query(_req: HttpRequest, json: web::Json) -> Result { + let time = Instant::now(); let json = json.into_inner(); let query = Query::parse(json)?; let storage = CONFIG.storage().get_object_store(); - let query_result = query.execute(storage).await; - - query_result + let query_result = query_result .map(Into::::into) .map(|response| response.to_http()) - .map_err(|e| e.into()) + .map_err(|e| e.into()); + + let time = time.elapsed().as_secs_f64(); + QUERY_EXECUTE_TIME + .with_label_values(&[query.stream_name.as_str()]) + .observe(time); + + query_result } // Handler for POST /api/v1/ingest diff --git a/server/src/main.rs b/server/src/main.rs index d83a5480a..176860242 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -21,7 +21,7 @@ use actix_web::dev::ServiceRequest; use actix_web::{middleware, web, App, HttpServer}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; -use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; +use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; use log::warn; @@ -45,6 +45,7 @@ mod banner; mod event; mod handlers; mod metadata; +mod metrics; mod migration; mod option; mod query; @@ -69,11 +70,8 @@ async fn main() -> anyhow::Result<()> { CONFIG.validate_storage(&*storage).await; let metadata = storage::resolve_parseable_metadata().await?; banner::print(&CONFIG, metadata); - let prometheus = PrometheusMetricsBuilder::new(env!("CARGO_PKG_NAME")) - .registry(prometheus::default_registry().clone()) - .endpoint("/metrics") - .build() - .unwrap(); + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); migration::run_migration(&CONFIG).await?; diff --git a/server/src/metadata.rs b/server/src/metadata.rs index ee4184215..628e69481 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -23,6 +23,7 @@ use std::sync::{Arc, RwLock}; use crate::alerts::Alerts; use crate::event::Event; +use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE}; use crate::stats::{Stats, StatsCounter}; use crate::storage::ObjectStorage; @@ -180,6 +181,12 @@ impl STREAM_INFO { stream.stats.add_ingestion_size(size); stream.stats.increase_event_by_one(); + EVENTS_INGESTED + .with_label_values(&[stream_name.clone(), "json"]) + .inc(); + EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name.clone(), "json"]) + .add(size as i64); Ok(()) } diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs new file mode 100644 index 000000000..9b8d32550 --- /dev/null +++ b/server/src/metrics/mod.rs @@ -0,0 +1,105 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod storage; + +use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; +use lazy_static::lazy_static; +use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; + +pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); + +lazy_static! { + pub static ref EVENTS_INGESTED: IntCounterVec = IntCounterVec::new( + Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), + &["stream", "format"] + ) + .expect("metric can be created"); + pub static ref EVENTS_INGESTED_SIZE: IntGaugeVec = IntGaugeVec::new( + Opts::new("events_ingested_size", "Events ingested size bytes") + .namespace(METRICS_NAMESPACE), + &["stream", "format"] + ) + .expect("metric can be created"); + pub static ref STORAGE_SIZE: IntGaugeVec = IntGaugeVec::new( + Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), + &["stream", "format"] + ) + .expect("metric can be created"); + pub static ref STAGING_FILES: IntGaugeVec = IntGaugeVec::new( + Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), + &["stream"] + ) + .expect("metric can be created"); + pub static ref QUERY_EXECUTE_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("query_execute_time", "Query execute time").namespace(METRICS_NAMESPACE), + &["stream"] + ) + .expect("metric can be created"); + pub static ref ALERTS_STATES: IntCounterVec = IntCounterVec::new( + Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), + &["stream", "name", "state"] + ) + .expect("metric can be created"); +} + +fn custom_metrics(registry: &Registry) { + registry + .register(Box::new(EVENTS_INGESTED.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_INGESTED_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(STORAGE_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(STAGING_FILES.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(QUERY_EXECUTE_TIME.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(ALERTS_STATES.clone())) + .expect("metric can be registered"); +} + +pub fn build_metrics_handler() -> PrometheusMetrics { + let registry = prometheus::Registry::new(); + custom_metrics(®istry); + + let prometheus = PrometheusMetricsBuilder::new(METRICS_NAMESPACE) + .registry(registry) + .endpoint("/metrics") + .build() + .expect("Prometheus initialization"); + + prom_process_metrics(&prometheus); + prometheus +} + +#[cfg(target_os = "linux")] +fn prom_process_metrics(metrics: &PrometheusMetrics) { + use prometheus::process_collector::ProcessCollector; + metrics + .registry + .register(Box::new(ProcessCollector::for_self())) + .expect("metric can be registered"); +} +#[cfg(not(target_os = "linux"))] +fn prom_process_metrics(_metrics: &PrometheusMetrics) {} diff --git a/server/src/metrics/storage.rs b/server/src/metrics/storage.rs new file mode 100644 index 000000000..d8ee85ef9 --- /dev/null +++ b/server/src/metrics/storage.rs @@ -0,0 +1,75 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web_prometheus::PrometheusMetrics; + +pub trait StorageMetrics { + fn register_metrics(&self, handler: &PrometheusMetrics); +} + +pub mod localfs { + use crate::{metrics::METRICS_NAMESPACE, storage::FSConfig}; + use lazy_static::lazy_static; + use prometheus::{HistogramOpts, HistogramVec}; + + use super::StorageMetrics; + + lazy_static! { + pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("local_fs_response_time", "FileSystem Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"] + ) + .expect("metric can be created"); + } + + impl StorageMetrics for FSConfig { + fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + handler + .registry + .register(Box::new(REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + } + } +} + +pub mod s3 { + use crate::{metrics::METRICS_NAMESPACE, storage::S3Config}; + use lazy_static::lazy_static; + use prometheus::{HistogramOpts, HistogramVec}; + + use super::StorageMetrics; + + lazy_static! { + pub static ref REQUEST_RESPONSE_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("s3_response_time", "S3 Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"] + ) + .expect("metric can be created"); + } + + impl StorageMetrics for S3Config { + fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + handler + .registry + .register(Box::new(REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + } + } +} diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index be426d553..dccce14a7 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -19,6 +19,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, + time::Instant, }; use async_trait::async_trait; @@ -38,6 +39,7 @@ use relative_path::RelativePath; use tokio::fs; use tokio_stream::wrappers::ReadDirStream; +use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::{option::validation, utils::validate_path_is_writeable}; use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; @@ -75,6 +77,10 @@ impl ObjectStorageProvider for FSConfig { fn get_endpoint(&self) -> String { self.root.to_str().unwrap().to_string() } + + fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + self.register_metrics(handler); + } } pub struct LocalFS { @@ -94,8 +100,9 @@ impl LocalFS { #[async_trait] impl ObjectStorage for LocalFS { async fn get_object(&self, path: &RelativePath) -> Result { + let time = Instant::now(); let file_path = self.path_in_root(path); - match fs::read(file_path).await { + let res: Result = match fs::read(file_path).await { Ok(x) => Ok(x.into()), Err(e) => match e.kind() { std::io::ErrorKind::NotFound => { @@ -103,7 +110,14 @@ impl ObjectStorage for LocalFS { } _ => Err(ObjectStorageError::UnhandledError(Box::new(e))), }, - } + }; + + let status = if res.is_ok() { "200" } else { "400" }; + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", status]) + .observe(time); + res } async fn put_object( @@ -111,11 +125,21 @@ impl ObjectStorage for LocalFS { path: &RelativePath, resource: Bytes, ) -> Result<(), ObjectStorageError> { + let time = Instant::now(); + let path = self.path_in_root(path); if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } - Ok(fs::write(path, resource).await?) + let res = fs::write(path, resource).await; + + let status = if res.is_ok() { "200" } else { "400" }; + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["PUT", status]) + .observe(time); + + res.map_err(Into::into) } async fn check(&self) -> Result<(), ObjectStorageError> { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index d8d3a2bf4..758fd9cc3 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -21,10 +21,15 @@ use super::{ Permisssion, StorageDir, StorageMetadata, CACHED_FILES, }; use crate::{ - alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, stats::Stats, + alerts::Alerts, + metadata::STREAM_INFO, + metrics::{storage::StorageMetrics, STAGING_FILES, STORAGE_SIZE}, + option::CONFIG, + stats::Stats, utils::batch_adapter::adapt_batch, }; +use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use bytes::Bytes; use datafusion::arrow::datatypes::Schema; @@ -58,10 +63,11 @@ pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; const SCHEMA_FILE_NAME: &str = ".schema"; const ALERT_FILE_NAME: &str = ".alert.json"; -pub trait ObjectStorageProvider { +pub trait ObjectStorageProvider: StorageMetrics { fn get_datafusion_runtime(&self) -> Arc; fn get_object_store(&self) -> Arc; fn get_endpoint(&self) -> String; + fn register_store_metrics(&self, handler: &PrometheusMetrics); } #[async_trait] @@ -227,8 +233,15 @@ pub trait ObjectStorage: Sync + 'static { // Do not include file which is being written to let time = chrono::Utc::now().naive_utc(); let staging_files = dir.arrow_files_grouped_exclude_time(time); + if staging_files.is_empty() { + STAGING_FILES.with_label_values(&[stream]).set(0); + } for (parquet_path, files) in staging_files { + STAGING_FILES + .with_label_values(&[stream]) + .set(files.len() as i64); + let record_reader = MergedRecordReader::try_new(&files).unwrap(); let mut parquet_table = CACHED_FILES.lock().unwrap(); @@ -315,6 +328,9 @@ pub trait ObjectStorage: Sync + 'static { for (stream, compressed_size) in stream_stats { let stats = STREAM_INFO.read().unwrap().get(stream).map(|metadata| { metadata.stats.add_storage_size(compressed_size); + STORAGE_SIZE + .with_label_values(&[stream, "parquet"]) + .add(compressed_size as i64); Stats::from(&metadata.stats) }); diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 6a3e4ae8f..10e77a454 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -45,7 +45,9 @@ use relative_path::RelativePath; use std::iter::Iterator; use std::path::Path; use std::sync::Arc; +use std::time::Instant; +use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; use super::ObjectStorageProvider; @@ -152,6 +154,10 @@ impl ObjectStorageProvider for S3Config { fn get_endpoint(&self) -> String { format!("{}/{}", self.endpoint_url, self.bucket_name) } + + fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + self.register_metrics(handler) + } } pub struct S3 { @@ -162,16 +168,58 @@ pub struct S3 { impl S3 { async fn _get_object(&self, path: &RelativePath) -> Result { + let instant = Instant::now(); + let resp = self .client .get_object() .bucket(&self.bucket) .key(path.as_str()) .send() - .await?; - let body = resp.body.collect().await; - let body_bytes = body.unwrap().into_bytes(); - Ok(body_bytes) + .await; + + match resp { + Ok(resp) => { + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + let body = resp.body.collect().await.unwrap().into_bytes(); + Ok(body) + } + Err(err) => { + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "400"]) + .observe(time); + Err(err.into()) + } + } + } + + async fn _put_object( + &self, + path: &RelativePath, + resource: Bytes, + md5: Option, + ) -> Result<(), AwsSdkError> { + let time = Instant::now(); + let resp = self + .client + .put_object() + .bucket(&self.bucket) + .key(path.as_str()) + .body(resource.into()) + .set_content_md5(md5) + .send() + .await; + let status = if resp.is_ok() { "200" } else { "400" }; + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["PUT", status]) + .observe(time); + + resp.map(|_| ()).map_err(|err| err.into()) } async fn _delete_stream(&self, stream_name: &str) -> Result<(), AwsSdkError> { @@ -235,6 +283,7 @@ impl S3 { ) -> Result<(), AwsSdkError> { let body = ByteStream::from_path(&path).await.unwrap(); + let instant = Instant::now(); let resp = self .client .put_object() @@ -243,11 +292,16 @@ impl S3 { .body(body) .set_content_md5(md5) .send() - .await?; + .await; - log::trace!("{:?}", resp); + let status = if resp.is_ok() { "200" } else { "400" }; + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["UPLOAD_PARQUET", status]) + .observe(time); - Ok(()) + log::trace!("{:?}", resp); + resp.map(|_| ()).map_err(|err| err.into()) } } @@ -268,13 +322,7 @@ impl ObjectStorage for S3 { BASE64.encode(hash.finalize()) }); - self.client - .put_object() - .bucket(&self.bucket) - .key(path.as_str()) - .body(resource.into()) - .set_content_md5(hash) - .send() + self._put_object(path, resource, hash) .await .map_err(|err| ObjectStorageError::ConnectionError(Box::new(err)))?;