diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 0cc757bff..baf0894aa 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -58,7 +58,8 @@ use super::modal::{IngestorMetadata, Metadata, NodeMetadata, NodeType, QuerierMe use super::rbac::RBACError; use super::role::RoleError; -pub const INTERNAL_STREAM_NAME: &str = "pmeta"; +pub const PMETA_STREAM_NAME: &str = "pmeta"; +pub const BILLING_METRICS_STREAM_NAME: &str = "pbilling"; const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); @@ -68,6 +69,217 @@ lazy_static! { static ref LAST_USED_QUERIER: Arc>> = Arc::new(RwLock::new(None)); } +#[derive(Debug, serde::Serialize, Clone)] +pub struct BillingMetricEvent { + pub node_address: String, + pub node_type: String, + pub metric_type: String, + pub date: String, + pub value: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub method: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub provider: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub model: Option, + pub event_type: String, + pub event_time: chrono::NaiveDateTime, +} + +// Internal structure for collecting metrics from prometheus +#[derive(Debug, Default)] +struct BillingMetricsCollector { + pub node_address: String, + pub node_type: String, + pub total_events_ingested_by_date: HashMap, + pub total_events_ingested_size_by_date: HashMap, + pub total_parquets_stored_by_date: HashMap, + pub total_parquets_stored_size_by_date: HashMap, + pub total_query_calls_by_date: HashMap, + pub total_files_scanned_in_query_by_date: HashMap, + pub total_bytes_scanned_in_query_by_date: HashMap, + pub total_object_store_calls_by_date: HashMap>, // method -> date -> count + pub total_files_scanned_in_object_store_calls_by_date: HashMap>, + pub total_bytes_scanned_in_object_store_calls_by_date: HashMap>, + pub total_input_llm_tokens_by_date: HashMap>>, // provider -> model -> date -> count + pub total_output_llm_tokens_by_date: HashMap>>, + pub event_time: chrono::NaiveDateTime, +} + +impl BillingMetricsCollector { + pub fn new(node_address: String, node_type: String) -> Self { + Self { + node_address, + node_type, + event_time: Utc::now().naive_utc(), + ..Default::default() + } + } + + /// Convert the collector into individual billing metric events, excluding empty collections + pub fn into_events(self) -> Vec { + let mut events = Vec::new(); + + // Add all different types of metrics + self.add_simple_metrics(&mut events); + self.add_object_store_metrics(&mut events); + self.add_llm_metrics(&mut events); + + events + } + + /// Add simple date-based metrics to the events vector + fn add_simple_metrics(&self, events: &mut Vec) { + let add_simple_metric = |events: &mut Vec, + metric_type: &str, + data: &HashMap| { + for (date, value) in data { + if *value > 0 { + events.push(BillingMetricEvent { + node_address: self.node_address.clone(), + node_type: self.node_type.clone(), + metric_type: metric_type.to_string(), + date: date.clone(), + value: *value, + method: None, + provider: None, + model: None, + event_type: "billing-metrics".to_string(), + event_time: self.event_time, + }); + } + } + }; + + // Add simple metrics (only if not empty) + if !self.total_events_ingested_by_date.is_empty() { + add_simple_metric( + events, + "total_events_ingested", + &self.total_events_ingested_by_date, + ); + } + if !self.total_events_ingested_size_by_date.is_empty() { + add_simple_metric( + events, + "total_events_ingested_size", + &self.total_events_ingested_size_by_date, + ); + } + if !self.total_parquets_stored_by_date.is_empty() { + add_simple_metric( + events, + "total_parquets_stored", + &self.total_parquets_stored_by_date, + ); + } + if !self.total_parquets_stored_size_by_date.is_empty() { + add_simple_metric( + events, + "total_parquets_stored_size", + &self.total_parquets_stored_size_by_date, + ); + } + if !self.total_query_calls_by_date.is_empty() { + add_simple_metric(events, "total_query_calls", &self.total_query_calls_by_date); + } + if !self.total_files_scanned_in_query_by_date.is_empty() { + add_simple_metric( + events, + "total_files_scanned_in_query", + &self.total_files_scanned_in_query_by_date, + ); + } + if !self.total_bytes_scanned_in_query_by_date.is_empty() { + add_simple_metric( + events, + "total_bytes_scanned_in_query", + &self.total_bytes_scanned_in_query_by_date, + ); + } + } + + /// Add object store metrics (method-based) to the events vector + fn add_object_store_metrics(&self, events: &mut Vec) { + let object_store_metrics = [ + ( + "total_object_store_calls", + &self.total_object_store_calls_by_date, + ), + ( + "total_files_scanned_in_object_store_calls", + &self.total_files_scanned_in_object_store_calls_by_date, + ), + ( + "total_bytes_scanned_in_object_store_calls", + &self.total_bytes_scanned_in_object_store_calls_by_date, + ), + ]; + + for (metric_type, data) in object_store_metrics { + if !data.is_empty() { + for (method, dates) in data { + for (date, value) in dates { + if *value > 0 { + events.push(BillingMetricEvent { + node_address: self.node_address.clone(), + node_type: self.node_type.clone(), + metric_type: metric_type.to_string(), + date: date.clone(), + value: *value, + method: Some(method.clone()), + provider: None, + model: None, + event_type: "billing-metrics".to_string(), + event_time: self.event_time, + }); + } + } + } + } + } + } + + /// Add LLM metrics (provider/model-based) to the events vector + fn add_llm_metrics(&self, events: &mut Vec) { + let llm_metrics = [ + ( + "total_input_llm_tokens", + &self.total_input_llm_tokens_by_date, + ), + ( + "total_output_llm_tokens", + &self.total_output_llm_tokens_by_date, + ), + ]; + + for (metric_type, data) in llm_metrics { + if !data.is_empty() { + for (provider, models) in data { + for (model, dates) in models { + for (date, value) in dates { + if *value > 0 { + events.push(BillingMetricEvent { + node_address: self.node_address.clone(), + node_type: self.node_type.clone(), + metric_type: metric_type.to_string(), + date: date.clone(), + value: *value, + method: None, + provider: Some(provider.clone()), + model: Some(model.clone()), + event_type: "billing-metrics".to_string(), + event_time: self.event_time, + }); + } + } + } + } + } + } + } +} + pub async fn for_each_live_ingestor(api_fn: F) -> Result<(), E> where F: Fn(NodeMetadata) -> Fut + Clone + Send + Sync + 'static, @@ -1006,6 +1218,341 @@ async fn fetch_cluster_metrics() -> Result, PostError> { Ok(all_metrics) } +/// Extracts billing metrics from prometheus samples +fn extract_billing_metrics_from_samples( + samples: Vec, + node_address: String, + node_type: String, +) -> Vec { + let mut collector = BillingMetricsCollector::new(node_address, node_type); + + for sample in samples { + if let prometheus_parse::Value::Counter(val) = sample.value { + process_sample(&mut collector, &sample, val); + } + } + + // Convert to flattened events, excluding empty collections + collector.into_events() +} + +/// Process a single prometheus sample and update the collector +fn process_sample( + collector: &mut BillingMetricsCollector, + sample: &prometheus_parse::Sample, + val: f64, +) { + match sample.metric.as_str() { + metric if is_simple_metric(metric) => { + process_simple_metric(collector, metric, &sample.labels, val); + } + metric if is_object_store_metric(metric) => { + process_object_store_metric(collector, metric, &sample.labels, val); + } + metric if is_llm_metric(metric) => { + process_llm_metric(collector, metric, &sample.labels, val); + } + _ => {} + } +} + +/// Check if a metric is a simple date-based metric +fn is_simple_metric(metric: &str) -> bool { + matches!( + metric, + "parseable_total_events_ingested_by_date" + | "parseable_total_events_ingested_size_by_date" + | "parseable_total_parquets_stored_by_date" + | "parseable_total_parquets_stored_size_by_date" + | "parseable_total_query_calls_by_date" + | "parseable_total_files_scanned_in_query_by_date" + | "parseable_total_bytes_scanned_in_query_by_date" + ) +} + +/// Check if a metric is an object store metric (requires method label) +fn is_object_store_metric(metric: &str) -> bool { + matches!( + metric, + "parseable_total_object_store_calls_by_date" + | "parseable_total_files_scanned_in_object_store_calls_by_date" + | "parseable_total_bytes_scanned_in_object_store_calls_by_date" + ) +} + +/// Check if a metric is an LLM metric (requires provider and model labels) +fn is_llm_metric(metric: &str) -> bool { + matches!( + metric, + "parseable_total_input_llm_tokens_by_date" | "parseable_total_output_llm_tokens_by_date" + ) +} + +/// Process simple metrics that only require a date label +fn process_simple_metric( + collector: &mut BillingMetricsCollector, + metric: &str, + labels: &std::collections::HashMap, + val: f64, +) { + if let Some(date) = labels.get("date") { + let value = val as u64; + match metric { + "parseable_total_events_ingested_by_date" => { + collector + .total_events_ingested_by_date + .insert(date.to_string(), value); + } + "parseable_total_events_ingested_size_by_date" => { + collector + .total_events_ingested_size_by_date + .insert(date.to_string(), value); + } + "parseable_total_parquets_stored_by_date" => { + collector + .total_parquets_stored_by_date + .insert(date.to_string(), value); + } + "parseable_total_parquets_stored_size_by_date" => { + collector + .total_parquets_stored_size_by_date + .insert(date.to_string(), value); + } + "parseable_total_query_calls_by_date" => { + collector + .total_query_calls_by_date + .insert(date.to_string(), value); + } + "parseable_total_files_scanned_in_query_by_date" => { + collector + .total_files_scanned_in_query_by_date + .insert(date.to_string(), value); + } + "parseable_total_bytes_scanned_in_query_by_date" => { + collector + .total_bytes_scanned_in_query_by_date + .insert(date.to_string(), value); + } + _ => {} + } + } +} + +/// Process object store metrics that require method and date labels +fn process_object_store_metric( + collector: &mut BillingMetricsCollector, + metric: &str, + labels: &std::collections::HashMap, + val: f64, +) { + if let (Some(method), Some(date)) = (labels.get("method"), labels.get("date")) { + let value = val as u64; + let target_map = match metric { + "parseable_total_object_store_calls_by_date" => { + &mut collector.total_object_store_calls_by_date + } + "parseable_total_files_scanned_in_object_store_calls_by_date" => { + &mut collector.total_files_scanned_in_object_store_calls_by_date + } + "parseable_total_bytes_scanned_in_object_store_calls_by_date" => { + &mut collector.total_bytes_scanned_in_object_store_calls_by_date + } + _ => return, + }; + + target_map + .entry(method.to_string()) + .or_insert_with(HashMap::new) + .insert(date.to_string(), value); + } +} + +/// Process LLM metrics that require provider, model, and date labels +fn process_llm_metric( + collector: &mut BillingMetricsCollector, + metric: &str, + labels: &std::collections::HashMap, + val: f64, +) { + if let (Some(provider), Some(model), Some(date)) = ( + labels.get("provider"), + labels.get("model"), + labels.get("date"), + ) { + let value = val as u64; + let target_map = match metric { + "parseable_total_input_llm_tokens_by_date" => { + &mut collector.total_input_llm_tokens_by_date + } + "parseable_total_output_llm_tokens_by_date" => { + &mut collector.total_output_llm_tokens_by_date + } + _ => return, + }; + + target_map + .entry(provider.to_string()) + .or_insert_with(HashMap::new) + .entry(model.to_string()) + .or_insert_with(HashMap::new) + .insert(date.to_string(), value); + } +} + +/// Fetches billing metrics for a single node +async fn fetch_node_billing_metrics(node: &T) -> Result, PostError> +where + T: Metadata + Send + Sync + 'static, +{ + // Format the metrics URL + let uri = Url::parse(&format!( + "{}{}/metrics", + node.domain_name(), + base_path_without_preceding_slash() + )) + .map_err(|err| PostError::Invalid(anyhow::anyhow!("Invalid URL in node metadata: {}", err)))?; + + // Check if the node is live + if !check_liveness(node.domain_name()).await { + warn!("node {} is not live", node.domain_name()); + return Ok(Vec::new()); + } + + // Fetch metrics + let res = INTRA_CLUSTER_CLIENT + .get(uri) + .header(header::AUTHORIZATION, node.token()) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; + + match res { + Ok(res) => { + let text = res.text().await.map_err(PostError::NetworkError)?; + let lines: Vec> = + text.lines().map(|line| Ok(line.to_owned())).collect_vec(); + + let sample = prometheus_parse::Scrape::parse(lines.into_iter()) + .map_err(|err| PostError::CustomError(err.to_string()))? + .samples; + + let billing_metrics = extract_billing_metrics_from_samples( + sample, + node.domain_name().to_string(), + node.node_type().to_string(), + ); + + Ok(billing_metrics) + } + Err(_) => { + warn!( + "Failed to fetch billing metrics from node: {}\n", + node.domain_name() + ); + Ok(Vec::new()) + } + } +} + +/// Fetches billing metrics from multiple nodes in parallel +async fn fetch_nodes_billing_metrics(nodes: Vec) -> Result, PostError> +where + T: Metadata + Send + Sync + 'static, +{ + let nodes_len = nodes.len(); + if nodes_len == 0 { + return Ok(vec![]); + } + + let results = stream::iter(nodes) + .map(|node| async move { fetch_node_billing_metrics(&node).await }) + .buffer_unordered(nodes_len) // No concurrency limit + .collect::>() + .await; + + // Collect results, filtering out errors and flattening events + let mut billing_metrics = Vec::new(); + for result in results { + match result { + Ok(metrics) => billing_metrics.extend(metrics), // Flatten all events from all nodes + Err(err) => { + error!("Error fetching billing metrics: {:?}", err); + // Continue with other nodes instead of failing the entire operation + } + } + } + + Ok(billing_metrics) +} + +/// Main function to fetch billing metrics from all nodes +async fn fetch_cluster_billing_metrics() -> Result, PostError> { + // Get all node types metadata concurrently + let (prism_result, querier_result, ingestor_result, indexer_result) = future::join4( + get_node_info(NodeType::Prism), + get_node_info(NodeType::Querier), + get_node_info(NodeType::Ingestor), + get_node_info(NodeType::Indexer), + ) + .await; + + // Handle results + let prism_metadata: Vec = prism_result.map_err(|err| { + error!("Failed to get prism info for billing metrics: {:?}", err); + PostError::Invalid(err) + })?; + + let querier_metadata: Vec = querier_result.map_err(|err| { + error!("Failed to get querier info for billing metrics: {:?}", err); + PostError::Invalid(err) + })?; + + let ingestor_metadata: Vec = ingestor_result.map_err(|err| { + error!("Failed to get ingestor info for billing metrics: {:?}", err); + PostError::Invalid(err) + })?; + + let indexer_metadata: Vec = indexer_result.map_err(|err| { + error!("Failed to get indexer info for billing metrics: {:?}", err); + PostError::Invalid(err) + })?; + + // Fetch billing metrics from all nodes concurrently + let (prism_metrics, querier_metrics, ingestor_metrics, indexer_metrics) = future::join4( + fetch_nodes_billing_metrics(prism_metadata), + fetch_nodes_billing_metrics(querier_metadata), + fetch_nodes_billing_metrics(ingestor_metadata), + fetch_nodes_billing_metrics(indexer_metadata), + ) + .await; + + // Combine all billing metrics + let mut all_billing_metrics = Vec::new(); + + // Add metrics from all node types + match prism_metrics { + Ok(metrics) => all_billing_metrics.extend(metrics), + Err(err) => error!("Error fetching prism billing metrics: {:?}", err), + } + + match querier_metrics { + Ok(metrics) => all_billing_metrics.extend(metrics), + Err(err) => error!("Error fetching querier billing metrics: {:?}", err), + } + + match ingestor_metrics { + Ok(metrics) => all_billing_metrics.extend(metrics), + Err(err) => error!("Error fetching ingestor billing metrics: {:?}", err), + } + + match indexer_metrics { + Ok(metrics) => all_billing_metrics.extend(metrics), + Err(err) => error!("Error fetching indexer billing metrics: {:?}", err), + } + + Ok(all_billing_metrics) +} + pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { info!("Setting up schedular for cluster metrics ingestion"); let mut scheduler = AsyncScheduler::new(); @@ -1013,15 +1560,16 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { .every(CLUSTER_METRICS_INTERVAL_SECONDS) .run(move || async { let result: Result<(), PostError> = async { + // Fetch regular cluster metrics let cluster_metrics = fetch_cluster_metrics().await; if let Ok(metrics) = cluster_metrics && !metrics.is_empty() { - info!("Cluster metrics fetched successfully from all ingestors"); + info!("Cluster metrics fetched successfully from all nodes"); if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { if matches!( ingest_internal_stream( - INTERNAL_STREAM_NAME.to_string(), + PMETA_STREAM_NAME.to_string(), bytes::Bytes::from(metrics_bytes), ) .await, @@ -1035,6 +1583,38 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { error!("Failed to serialize cluster metrics"); } } + + // Fetch billing metrics + match fetch_cluster_billing_metrics().await { + Ok(metrics) if !metrics.is_empty() => { + info!("Billing metrics fetched successfully from all nodes"); + // Optionally add: trace!("Billing metrics: {:?}", metrics); + if let Ok(billing_metrics_bytes) = serde_json::to_vec(&metrics) { + if matches!( + ingest_internal_stream( + BILLING_METRICS_STREAM_NAME.to_string(), + bytes::Bytes::from(billing_metrics_bytes), + ) + .await, + Ok(()) + ) { + info!("Billing metrics successfully ingested into billing-metrics stream"); + } else { + error!("Failed to ingest billing metrics into billing-metrics stream"); + } + } else { + error!("Failed to serialize billing metrics"); + } + } + Ok(_) => { + // Empty metrics result + info!("No billing metrics to ingest (empty result)"); + } + Err(err) => { + error!("Error fetching billing metrics: {:?}", err); + } + } + Ok(()) } .await; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index a86888baa..91229b7be 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -136,7 +136,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); let mut p_custom_fields = HashMap::new(); p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); - p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); + p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Json.to_string()); // For internal streams, use old schema format::json::Event::new(json.into_inner(), Utc::now()) .into_event( diff --git a/src/hottier.rs b/src/hottier.rs index 10eb64740..a4e78bca6 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -24,7 +24,7 @@ use std::{ use crate::{ catalog::manifest::{File, Manifest}, - handlers::http::cluster::INTERNAL_STREAM_NAME, + handlers::http::cluster::PMETA_STREAM_NAME, parseable::PARSEABLE, storage::{ObjectStorageError, field_stats::DATASET_STATS_STREAM_NAME}, utils::{extract_datetime, human_size::bytes_to_human_size}, @@ -697,7 +697,7 @@ impl HotTierManager { } pub async fn put_internal_stream_hot_tier(&self) -> Result<(), HotTierError> { - if !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) { + if !self.check_stream_hot_tier_exists(PMETA_STREAM_NAME) { let mut stream_hot_tier = StreamHotTier { version: Some(CURRENT_HOT_TIER_VERSION.to_string()), size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES, @@ -705,7 +705,7 @@ impl HotTierManager { available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES, oldest_date_time_entry: None, }; - self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier) + self.put_hot_tier(PMETA_STREAM_NAME, &mut stream_hot_tier) .await?; } Ok(()) diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index d75a81114..c202dd31e 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -19,7 +19,7 @@ use crate::{ catalog::snapshot::CURRENT_SNAPSHOT_VERSION, - handlers::{TelemetryType, http::cluster::INTERNAL_STREAM_NAME}, + handlers::{TelemetryType, http::cluster::PMETA_STREAM_NAME}, storage, }; use serde_json::{Value, json}; @@ -160,7 +160,7 @@ pub fn v4_v5(mut stream_metadata: Value, stream_name: &str) -> Value { ); let stream_type = stream_metadata_map.get("stream_type"); if stream_type.is_none() { - if stream_name.eq(INTERNAL_STREAM_NAME) { + if stream_name.eq(PMETA_STREAM_NAME) { stream_metadata_map.insert( "stream_type".to_owned(), Value::String(storage::StreamType::Internal.to_string()), diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 243cfaf72..1a0e4374b 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -49,7 +49,9 @@ use crate::{ handlers::{ STREAM_TYPE_KEY, TelemetryType, http::{ - cluster::{INTERNAL_STREAM_NAME, sync_streams_with_ingestors}, + cluster::{ + BILLING_METRICS_STREAM_NAME, PMETA_STREAM_NAME, sync_streams_with_ingestors, + }, ingest::PostError, logstream::error::{CreateStreamError, StreamError}, modal::{ingest_server::INGESTOR_META, utils::logstream_utils::PutStreamHeaders}, @@ -393,18 +395,38 @@ impl Parseable { pub async fn create_internal_stream_if_not_exists(&self) -> Result<(), StreamError> { let log_source_entry = LogSourceEntry::new(LogSource::Pmeta, HashSet::new()); - match self + let internal_stream_result = self .create_stream_if_not_exists( - INTERNAL_STREAM_NAME, + PMETA_STREAM_NAME, StreamType::Internal, None, vec![log_source_entry], TelemetryType::Logs, ) - .await - { - Err(_) | Ok(true) => return Ok(()), - _ => {} + .await; + + let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); + let billing_stream_result = self + .create_stream_if_not_exists( + BILLING_METRICS_STREAM_NAME, + StreamType::Internal, + None, + vec![log_source_entry], + TelemetryType::Logs, + ) + .await; + + // Check if either stream creation failed + if let Err(e) = &internal_stream_result { + tracing::error!("Failed to create pmeta stream: {:?}", e); + } + if let Err(e) = &billing_stream_result { + tracing::error!("Failed to create billing stream: {:?}", e); + } + + // Check if both streams already existed + if matches!(internal_stream_result, Ok(true)) && matches!(billing_stream_result, Ok(true)) { + return Ok(()); } let mut header_map = HeaderMap::new(); @@ -413,7 +435,23 @@ impl Parseable { HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(), ); header_map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; + + // Sync only the streams that were created successfully + if matches!(internal_stream_result, Ok(false)) + && let Err(e) = + sync_streams_with_ingestors(header_map.clone(), Bytes::new(), PMETA_STREAM_NAME) + .await + { + tracing::error!("Failed to sync pmeta stream with ingestors: {:?}", e); + } + + if matches!(billing_stream_result, Ok(false)) + && let Err(e) = + sync_streams_with_ingestors(header_map, Bytes::new(), BILLING_METRICS_STREAM_NAME) + .await + { + tracing::error!("Failed to sync billing stream with ingestors: {:?}", e); + } Ok(()) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b1abc5e52..d3f38e4ba 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -604,6 +604,15 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { &self, stream_name: &str, ) -> Result { + // create only when stream name not found in memory + if PARSEABLE.get_stream(stream_name).is_ok() { + let stream_metadata_bytes = PARSEABLE + .metastore + .get_stream_json(stream_name, false) + .await + .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?; + return Ok(stream_metadata_bytes); + } let mut all_log_sources: Vec = Vec::new(); if let Some(stream_metadata_obs) = PARSEABLE