From a7e473b3902dd25df2e4d7b229a6a0ffda26b818 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 24 Mar 2025 14:39:30 -0400 Subject: [PATCH 1/6] fix: log source format in stream info rename as below - OtelLogs -> otel-logs OtelMetrics -> otel-metrics OtelTraces -> otel-traces Pmeta -> pmeta Json -> json Kinesis -> kinesis added migration steps to perform the rename for existing streams --- src/event/format/mod.rs | 6 ++++ src/migration/mod.rs | 22 ++++++++++++++- src/migration/stream_metadata_migration.rs | 32 ++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index bc0c0218c..da309d3b5 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -59,19 +59,25 @@ type EventSchema = Vec>; #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum LogSource { // AWS Kinesis sends logs in the format of a json array + #[serde(rename = "kinesis")] Kinesis, // OpenTelemetry sends logs according to the specification as explained here // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1 + #[serde(rename = "otel-logs")] OtelLogs, // OpenTelemetry sends traces according to the specification as explained here // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto + #[serde(rename = "otel-traces")] OtelMetrics, // OpenTelemetry sends traces according to the specification as explained here // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1 + #[serde(rename = "otel-metrics")] OtelTraces, // Internal Stream format + #[serde(rename = "pmeta")] Pmeta, #[default] + #[serde(rename = "json")] // Json object or array Json, Custom(String), diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 20c159a63..407ae2ed2 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -246,6 +246,9 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::rename_log_source_v6(stream_metadata_value); + storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; @@ -259,6 +262,9 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::rename_log_source_v6(stream_metadata_value); + storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; @@ -272,6 +278,9 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::rename_log_source_v6(stream_metadata_value); + storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; @@ -279,17 +288,28 @@ async fn migrate_stream_metadata( Some("v4") => { stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::rename_log_source_v6(stream_metadata_value); + storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; } Some("v5") => { stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = + stream_metadata_migration::rename_log_source_v6(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + } + _ => { + stream_metadata_value = + stream_metadata_migration::rename_log_source_v6(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; } - _ => (), } Ok(stream_metadata_value) diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index ab29e124b..242bea7d9 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -201,6 +201,38 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value { stream_metadata } +pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value { + let stream_metadata_map: &mut serde_json::Map = + stream_metadata.as_object_mut().unwrap(); + let log_source = stream_metadata_map.get("log_source"); + if let Some(log_source) = log_source { + //rename log_source_format -> + //Kinesis to kinesis + //OtelLogs to otel-logs + //OtelTraces to otel-traces + //OtelMetrics to otel-metrics + //Pmeta to pmeta + //Json to json + + let log_source_entry = serde_json::from_value::(log_source.clone()); + if let Ok(mut log_source_entry) = log_source_entry { + let log_source = log_source_entry.log_source_format.clone(); + let log_source_format = match log_source { + LogSource::Kinesis => "kinesis", + LogSource::OtelLogs => "otel-logs", + LogSource::OtelTraces => "otel-traces", + LogSource::OtelMetrics => "otel-metrics", + LogSource::Pmeta => "pmeta", + LogSource::Json => "json", + _ => "", + }; + log_source_entry.log_source_format = LogSource::from(log_source_format); + stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry])); + } + } + stream_metadata +} + fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { let manifest_list = snapshot.get("manifest_list").unwrap(); let mut new_manifest_list = Vec::new(); From b6bd571f29678a79dc4d2842049e043906022420 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 24 Mar 2025 22:41:10 -0400 Subject: [PATCH 2/6] update log_source_format --- src/migration/stream_metadata_migration.rs | 47 ++++++++++------------ 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index 242bea7d9..ef7535fd7 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -17,7 +17,7 @@ * */ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use serde_json::{json, Value}; @@ -202,32 +202,27 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value { } pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value { - let stream_metadata_map: &mut serde_json::Map = - stream_metadata.as_object_mut().unwrap(); - let log_source = stream_metadata_map.get("log_source"); - if let Some(log_source) = log_source { - //rename log_source_format -> - //Kinesis to kinesis - //OtelLogs to otel-logs - //OtelTraces to otel-traces - //OtelMetrics to otel-metrics - //Pmeta to pmeta - //Json to json + let mut format_mapping = HashMap::new(); + format_mapping.insert("Kinesis", "kinesis"); + format_mapping.insert("OtelLogs", "otel-logs"); + format_mapping.insert("OtelTraces", "otel-traces"); + format_mapping.insert("OtelMetrics", "otel-metrics"); + format_mapping.insert("Pmeta", "pmeta"); + format_mapping.insert("Json", "json"); - let log_source_entry = serde_json::from_value::(log_source.clone()); - if let Ok(mut log_source_entry) = log_source_entry { - let log_source = log_source_entry.log_source_format.clone(); - let log_source_format = match log_source { - LogSource::Kinesis => "kinesis", - LogSource::OtelLogs => "otel-logs", - LogSource::OtelTraces => "otel-traces", - LogSource::OtelMetrics => "otel-metrics", - LogSource::Pmeta => "pmeta", - LogSource::Json => "json", - _ => "", - }; - log_source_entry.log_source_format = LogSource::from(log_source_format); - stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry])); + // Transform log_source_format in each log_source entry if it exists + if let Some(log_sources) = stream_metadata + .get_mut("log_source") + .and_then(|v| v.as_array_mut()) + { + for source in log_sources.iter_mut() { + if let Some(format_value) = source.get_mut("log_source_format") { + if let Some(format_str) = format_value.as_str() { + if let Some(new_format) = format_mapping.get(format_str) { + *format_value = json!(new_format); + } + } + } } } stream_metadata From 4dc304f7caa77a6bf2fca3f1866825cb1e7c12e7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 24 Mar 2025 22:49:39 -0400 Subject: [PATCH 3/6] add test --- src/migration/stream_metadata_migration.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index ef7535fd7..5702b505f 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -295,4 +295,12 @@ mod tests { let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } + + #[test] + fn test_rename_log_source_v6() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"OtelLogs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"OtelTraces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"OtelMetrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]}); + let expected = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"otel-traces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"otel-metrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]}); + let updated_stream_metadata = super::rename_log_source_v6(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } } From 0baf20fbb99f98e21bb4b5520674d1b762366b7d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 24 Mar 2025 23:18:10 -0400 Subject: [PATCH 4/6] correct rename --- src/event/format/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index da309d3b5..c5fc17e1d 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -67,11 +67,11 @@ pub enum LogSource { OtelLogs, // OpenTelemetry sends traces according to the specification as explained here // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto - #[serde(rename = "otel-traces")] + #[serde(rename = "otel-metrics")] OtelMetrics, // OpenTelemetry sends traces according to the specification as explained here // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1 - #[serde(rename = "otel-metrics")] + #[serde(rename = "otel-traces")] OtelTraces, // Internal Stream format #[serde(rename = "pmeta")] From ba7c3a7c619c01a8996ad3438434e780f3b9d839 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 25 Mar 2025 03:00:23 -0400 Subject: [PATCH 5/6] fix migration --- src/migration/mod.rs | 10 ---- src/migration/stream_metadata_migration.rs | 54 ++++++++++++++++------ 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 407ae2ed2..d101879cf 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -246,8 +246,6 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::rename_log_source_v6(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -262,8 +260,6 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::rename_log_source_v6(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -278,8 +274,6 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::rename_log_source_v6(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -288,8 +282,6 @@ async fn migrate_stream_metadata( Some("v4") => { stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::rename_log_source_v6(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -297,8 +289,6 @@ async fn migrate_stream_metadata( } Some("v5") => { stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::rename_log_source_v6(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index 5702b505f..12970949b 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -17,14 +17,12 @@ * */ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use serde_json::{json, Value}; use crate::{ - catalog::snapshot::CURRENT_SNAPSHOT_VERSION, - event::format::{LogSource, LogSourceEntry}, - handlers::http::cluster::INTERNAL_STREAM_NAME, + catalog::snapshot::CURRENT_SNAPSHOT_VERSION, handlers::http::cluster::INTERNAL_STREAM_NAME, storage, }; @@ -191,13 +189,41 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value { "version".to_owned(), Value::String(storage::CURRENT_SCHEMA_VERSION.into()), ); - let mut log_source_entry = LogSourceEntry::default(); - if let Some(log_source) = stream_metadata_map.get("log_source") { - if let Ok(log_source) = serde_json::from_value::(log_source.clone()) { - log_source_entry = LogSourceEntry::new(log_source, HashSet::new()); + if let Some(log_source) = stream_metadata_map.remove("log_source") { + if let Some(format_str) = log_source.as_str() { + let transformed_format = match format_str { + "Kinesis" => "kinesis", + "OtelLogs" => "otel-logs", + "OtelTraces" => "otel-traces", + "OtelMetrics" => "otel-metrics", + "Pmeta" => "pmeta", + "Json" => "json", + _ => "json", + }; + + let log_source_entry = json!({ + "log_source_format": transformed_format, + "fields": [] + }); + + stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry])); + } else { + let default_entry = json!({ + "log_source_format": "json", + "fields": [] + }); + + stream_metadata_map.insert("log_source".to_owned(), json!([default_entry])); } + } else { + let default_entry = json!({ + "log_source_format": "json", + "fields": [] + }); + + stream_metadata_map.insert("log_source".to_owned(), json!([default_entry])); } - stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry])); + stream_metadata } @@ -259,7 +285,7 @@ mod tests { #[test] fn test_v5_v6_with_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"OtelLogs"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"OtelLogs","fields":[]}]}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -267,7 +293,7 @@ mod tests { #[test] fn test_v5_v6_with_default_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Json"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -275,7 +301,7 @@ mod tests { #[test] fn test_v5_v6_without_log_source() { let stream_metadata = serde_json::json!({"version":"v4","schema_version":"v0","objectstore-format":"v4","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -283,7 +309,7 @@ mod tests { #[test] fn test_v5_v6_unknown_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Invalid"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -291,7 +317,7 @@ mod tests { #[test] fn test_v5_v6_invalid_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":{"log_source": "Invalid"}}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } From 34e805989c173d6ae8c4fba4949385c81f65f5dc Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 25 Mar 2025 03:25:51 -0400 Subject: [PATCH 6/6] refactor --- src/migration/stream_metadata_migration.rs | 88 ++++++++++++---------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index 12970949b..cab98258a 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -179,8 +179,8 @@ pub fn v4_v5(mut stream_metadata: Value, stream_name: &str) -> Value { } pub fn v5_v6(mut stream_metadata: Value) -> Value { - let stream_metadata_map: &mut serde_json::Map = - stream_metadata.as_object_mut().unwrap(); + let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); + stream_metadata_map.insert( "objectstore-format".to_owned(), Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), @@ -189,54 +189,52 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value { "version".to_owned(), Value::String(storage::CURRENT_SCHEMA_VERSION.into()), ); - if let Some(log_source) = stream_metadata_map.remove("log_source") { - if let Some(format_str) = log_source.as_str() { - let transformed_format = match format_str { - "Kinesis" => "kinesis", - "OtelLogs" => "otel-logs", - "OtelTraces" => "otel-traces", - "OtelMetrics" => "otel-metrics", - "Pmeta" => "pmeta", - "Json" => "json", - _ => "json", - }; - let log_source_entry = json!({ - "log_source_format": transformed_format, - "fields": [] - }); + // Transform or add log_source + let log_source_entry = match stream_metadata_map.remove("log_source") { + Some(log_source) => transform_log_source(log_source), + None => default_log_source_entry(), + }; - stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry])); - } else { - let default_entry = json!({ - "log_source_format": "json", - "fields": [] - }); + stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry])); - stream_metadata_map.insert("log_source".to_owned(), json!([default_entry])); - } - } else { - let default_entry = json!({ - "log_source_format": "json", + stream_metadata +} + +fn transform_log_source(log_source: Value) -> Value { + if let Some(format_str) = log_source.as_str() { + let transformed_format = map_log_source_format(format_str); + json!({ + "log_source_format": transformed_format, "fields": [] - }); + }) + } else { + default_log_source_entry() + } +} - stream_metadata_map.insert("log_source".to_owned(), json!([default_entry])); +fn map_log_source_format(format_str: &str) -> &str { + match format_str { + "Kinesis" => "kinesis", + "OtelLogs" => "otel-logs", + "OtelTraces" => "otel-traces", + "OtelMetrics" => "otel-metrics", + "Pmeta" => "pmeta", + "Json" => "json", + _ => "json", } +} - stream_metadata +fn default_log_source_entry() -> Value { + json!({ + "log_source_format": "json", + "fields": [] + }) } pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value { - let mut format_mapping = HashMap::new(); - format_mapping.insert("Kinesis", "kinesis"); - format_mapping.insert("OtelLogs", "otel-logs"); - format_mapping.insert("OtelTraces", "otel-traces"); - format_mapping.insert("OtelMetrics", "otel-metrics"); - format_mapping.insert("Pmeta", "pmeta"); - format_mapping.insert("Json", "json"); + let format_mapping = create_format_mapping(); - // Transform log_source_format in each log_source entry if it exists if let Some(log_sources) = stream_metadata .get_mut("log_source") .and_then(|v| v.as_array_mut()) @@ -251,9 +249,21 @@ pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value { } } } + stream_metadata } +fn create_format_mapping() -> HashMap<&'static str, &'static str> { + HashMap::from([ + ("Kinesis", "kinesis"), + ("OtelLogs", "otel-logs"), + ("OtelTraces", "otel-traces"), + ("OtelMetrics", "otel-metrics"), + ("Pmeta", "pmeta"), + ("Json", "json"), + ]) +} + fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { let manifest_list = snapshot.get("manifest_list").unwrap(); let mut new_manifest_list = Vec::new();