From 365822af6a5b7f1bee01ce58fcb2b0ad1ca922f3 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 21 Mar 2023 11:20:04 +0530 Subject: [PATCH 01/14] Refactor Event Ingestion * Add EventFormat Trait * Add support for batchinig array of object * Derive sub schema for event instead of fetching from hashed key * Defer update of schema until sync process * Change alert to use recordbatch instead * Update stats from bytes and number of objects in event. --- Cargo.lock | 398 +++++++++++++++++------ server/Cargo.toml | 10 +- server/src/alerts/mod.rs | 58 ++-- server/src/alerts/rule.rs | 218 +++++++------ server/src/alerts/target.rs | 21 +- server/src/event.rs | 339 ++++--------------- server/src/event/format.rs | 116 +++++++ server/src/event/format/json.rs | 138 ++++++++ server/src/event/writer.rs | 5 +- server/src/handlers/http.rs | 4 +- server/src/handlers/http/ingest.rs | 105 +++--- server/src/handlers/http/logstream.rs | 6 +- server/src/metadata.rs | 84 +++-- server/src/migration/schema_migration.rs | 2 +- server/src/option.rs | 16 +- server/src/query.rs | 4 +- server/src/query/table_provider.rs | 2 +- server/src/stats.rs | 8 +- server/src/storage.rs | 9 +- server/src/storage/object_storage.rs | 175 +++++----- server/src/storage/s3.rs | 9 +- server/src/storage/store_metadata.rs | 5 +- server/src/utils.rs | 1 + server/src/utils/arrow.rs | 57 ++++ server/src/utils/json.rs | 17 +- server/src/utils/json/flatten.rs | 38 ++- server/src/validator.rs | 6 +- 27 files changed, 1069 insertions(+), 782 deletions(-) create mode 100644 server/src/event/format.rs create mode 100644 server/src/event/format/json.rs create mode 100644 server/src/utils/arrow.rs diff --git a/Cargo.lock b/Cargo.lock index 25fa460d8..3b7e62f2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,7 +94,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -378,22 +378,22 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b556d39f9d19e363833a0fe65d591cd0e2ecc0977589a78179b592bea8dc945" +checksum = "f410d3907b6b3647b9e7bca4551274b2e3d716aa940afb67b7287257401da921" dependencies = [ "ahash 0.8.3", "arrow-arith", - "arrow-array", - "arrow-buffer", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", "arrow-cast", "arrow-csv", - "arrow-data", + "arrow-data 34.0.0", "arrow-ipc", "arrow-json", "arrow-ord", "arrow-row", - "arrow-schema", + "arrow-schema 34.0.0", "arrow-select", "arrow-string", "comfy-table", @@ -401,14 +401,14 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c61b9235694b48f60d89e0e8d6cb478f39c65dd14b0fe1c3f04379b7d50068" +checksum = "f87391cf46473c9bc53dab68cb8872c3a81d4dfd1703f1c8aa397dba9880a043" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "chrono", "half", "num", @@ -416,25 +416,52 @@ dependencies = [ [[package]] name = "arrow-array" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e6e839764618a911cc460a58ebee5ad3d42bc12d9a5e96a29b7cc296303aa1" +checksum = "d35d5475e65c57cffba06d0022e3006b677515f99b54af33a7cd54f6cdd4a5b5" dependencies = [ "ahash 0.8.3", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-buffer 34.0.0", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "chrono", "half", "hashbrown 0.13.2", "num", ] +[[package]] +name = "arrow-array" +version = "35.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43489bbff475545b78b0e20bde1d22abd6c99e54499839f9e815a2fa5134a51b" +dependencies = [ + "ahash 0.8.3", + "arrow-buffer 35.0.0", + "arrow-data 35.0.0", + "arrow-schema 35.0.0", + "chrono", + "chrono-tz", + "half", + "hashbrown 0.13.2", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "34.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b4ec72eda7c0207727df96cf200f539749d736b21f3e782ece113e18c1a0a7" +dependencies = [ + "half", + "num", +] + [[package]] name = "arrow-buffer" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03a21d232b1bc1190a3fdd2f9c1e39b7cd41235e95a0d44dd4f522bc5f495748" +checksum = "a3759e4a52c593281184787af5435671dc8b1e78333e5a30242b2e2d6e3c9d1f" dependencies = [ "half", "num", @@ -442,14 +469,14 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83dcdb1436cac574f1c1b30fda91c53c467534337bef4064bbd4ea2d6fbc6e04" +checksum = "0a7285272c9897321dfdba59de29f5b05aeafd3cdedf104a941256d155f6d304" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "arrow-select", "chrono", "lexical-core", @@ -458,15 +485,15 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a01677ae9458f5af9e35e1aa6ba97502f539e621db0c6672566403f97edd0448" +checksum = "981ee4e7f6a120da04e00d0b39182e1eeacccb59c8da74511de753c56b7fddf7" dependencies = [ - "arrow-array", - "arrow-buffer", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", "arrow-cast", - "arrow-data", - "arrow-schema", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "chrono", "csv", "csv-core", @@ -477,109 +504,128 @@ dependencies = [ [[package]] name = "arrow-data" -version = "31.0.0" +version = "34.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27cc673ee6989ea6e4b4e8c7d461f7e06026a096c8f0b1a7288885ff71ae1e56" +dependencies = [ + "arrow-buffer 34.0.0", + "arrow-schema 34.0.0", + "half", + "num", +] + +[[package]] +name = "arrow-data" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14e3e69c9fd98357eeeab4aa0f626ecf7ecf663e68e8fc04eac87c424a414477" +checksum = "19c7787c6cdbf9539b1ffb860bfc18c5848926ec3d62cbd52dc3b1ea35c874fd" dependencies = [ - "arrow-buffer", - "arrow-schema", + "arrow-buffer 35.0.0", + "arrow-schema 35.0.0", "half", "num", ] [[package]] name = "arrow-ipc" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cac2706acbd796965b6eaf0da30204fe44aacf70273f8cb3c9b7d7f3d4c190" +checksum = "e37b8b69d9e59116b6b538e8514e0ec63a30f08b617ce800d31cb44e3ef64c1a" dependencies = [ - "arrow-array", - "arrow-buffer", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", "arrow-cast", - "arrow-data", - "arrow-schema", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "flatbuffers", ] [[package]] name = "arrow-json" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7790e8b7df2d8ef5ac802377ac256cf2fb80cbf7d44b82d6464e20ace6232a5a" +checksum = "80c3fa0bed7cfebf6d18e46b733f9cb8a1cb43ce8e6539055ca3e1e48a426266" dependencies = [ - "arrow-array", - "arrow-buffer", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", "arrow-cast", - "arrow-data", - "arrow-schema", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "chrono", "half", "indexmap", + "lexical-core", "num", "serde_json", ] [[package]] name = "arrow-ord" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7ee6e1b761dfffaaf7b5bbe68c113a576a3a802146c5c0b9fcec781e30d80a3" +checksum = "d247dce7bed6a8d6a3c6debfa707a3a2f694383f0c692a39d736a593eae5ef94" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "arrow-select", "num", ] [[package]] name = "arrow-row" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e65bfedf782fc92721e796fdd26ae7343c98ba9a9243d62def9e4e1c4c1cf0b" +checksum = "8d609c0181f963cea5c70fddf9a388595b5be441f3aa1d1cdbf728ca834bbd3a" dependencies = [ "ahash 0.8.3", - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "half", "hashbrown 0.13.2", ] [[package]] name = "arrow-schema" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73ca49d010b27e2d73f70c1d1f90c1b378550ed0f4ad379c4dea0c997d97d723" +checksum = "64951898473bfb8e22293e83a44f02874d2257514d49cd95f9aa4afcff183fbc" dependencies = [ "serde", ] +[[package]] +name = "arrow-schema" +version = "35.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf6b26f6a6f8410e3b9531cbd1886399b99842701da77d4b4cf2013f7708f20f" + [[package]] name = "arrow-select" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976cbaeb1a85c09eea81f3f9c149c758630ff422ed0238624c5c3f4704b6a53c" +checksum = "2a513d89c2e1ac22b28380900036cf1f3992c6443efc5e079de631dcf83c6888" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "num", ] [[package]] name = "arrow-string" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d4882762f8f48a9218946c016553d38b04b4fe8202038dad4141b3b887b7da8" +checksum = "5288979b2705dae1114c864d73150629add9153b9b8f1d7ee3963db94c372ba5" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", + "arrow-data 34.0.0", + "arrow-schema 34.0.0", "arrow-select", "regex", "regex-syntax", @@ -616,6 +662,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", + "zstd 0.11.2+zstd.1.5.2", + "zstd-safe 5.0.2+zstd.1.5.2", ] [[package]] @@ -1158,6 +1206,28 @@ dependencies = [ "chrono", ] +[[package]] +name = "chrono-tz" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa48fa079165080f11d7753fd0bc175b7d391f276b965fe4b55bfad67856e463" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9998fb9f7e9b2111641485bf8beb32f92945f97f92a3d061f744cfef335f751" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "clap" version = "3.2.23" @@ -1535,9 +1605,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "17.0.0" +version = "21.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d90cae91414aaeda37ae8022a23ef1124ca8efc08ac7d7770274249f7cf148" +checksum = "c187589ce9ddf0bbc90e2e3dc0a89b90cc3d4bfdeefc7cf2aaa8ac15f7725811" dependencies = [ "ahash 0.8.3", "arrow", @@ -1548,6 +1618,7 @@ dependencies = [ "chrono", "dashmap", "datafusion-common", + "datafusion-execution", "datafusion-expr", "datafusion-optimizer", "datafusion-physical-expr", @@ -1565,7 +1636,6 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "paste", "percent-encoding", "pin-project-lite", "rand", @@ -1578,15 +1648,17 @@ dependencies = [ "url", "uuid", "xz2", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] name = "datafusion-common" -version = "17.0.0" +version = "21.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b21c4b8e8b7815e86d79d25da16854fee6d4d1b386572e802a248b7d43188e86" +checksum = "ecbbfb88a799beca6a361c1282795f0f185b96201dab496d733a49bdf4684f7f" dependencies = [ "arrow", + "arrow-array 35.0.0", "chrono", "num_cpus", "object_store", @@ -1594,24 +1666,41 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-execution" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73a38825b879024a87937b3b5ea8e43287ab3432db8786a2839dcbf141b6d938" +dependencies = [ + "dashmap", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.13.2", + "log", + "object_store", + "parking_lot", + "rand", + "tempfile", + "url", +] + [[package]] name = "datafusion-expr" -version = "17.0.0" +version = "21.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db8c07b051fbaf01657a3eb910a76b042ecfed0350a40412f70cf6b949bd5328" +checksum = "05454741d8496faf9f433a666e97ce693807e8374e0fd513eda5a8218ba8456d" dependencies = [ "ahash 0.8.3", "arrow", "datafusion-common", - "log", "sqlparser", ] [[package]] name = "datafusion-optimizer" -version = "17.0.0" +version = "21.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2ce4d34a808cd2e4c4864cdc759dd1bd22dcac2b8af38aa570e30fd54577c4d" +checksum = "d5d551c428b8557790cceecb59615f624b24dddf60b4d843c5994f8120b48c7f" dependencies = [ "arrow", "async-trait", @@ -1620,20 +1709,21 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.13.2", + "itertools", "log", "regex-syntax", ] [[package]] name = "datafusion-physical-expr" -version = "17.0.0" +version = "21.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a38afa11a09505c24bd7e595039d7914ec39329ba490209413ef2d37895c8220" +checksum = "08aa1047edf92d59f97b18dfbb1cade176a970b1a98b0a27f909409ceb05906e" dependencies = [ "ahash 0.8.3", "arrow", - "arrow-buffer", - "arrow-schema", + "arrow-buffer 34.0.0", + "arrow-schema 34.0.0", "blake2", "blake3", "chrono", @@ -1646,8 +1736,8 @@ dependencies = [ "itertools", "lazy_static", "md-5", - "num-traits", "paste", + "petgraph", "rand", "regex", "sha2", @@ -1657,9 +1747,9 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "17.0.0" +version = "21.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9172411b25ff4aa97f8e99884898595a581636d93cc96c12f96dbe3bf51cd7e5" +checksum = "2fc83ac8761c251617c1b7e1122adf79ebbf215ecabc4e2346cda1c4307d5152" dependencies = [ "arrow", "datafusion-common", @@ -1669,11 +1759,11 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "17.0.0" +version = "21.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fbe5e61563ced2f6992a60afea568ff3de69e32940bbf07db06fc5c9d8cd866" +checksum = "46d6cbfa8c6ac06202badbac6e4675c33b91d299f711a4fee23327b83906e2ee" dependencies = [ - "arrow-schema", + "arrow-schema 34.0.0", "datafusion-common", "datafusion-expr", "log", @@ -1827,14 +1917,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flatbuffers" -version = "22.9.29" +version = "23.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce016b9901aef3579617931fbb2df8fc9a9f7cb95a16eb8acc8148209bb9e70" +checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619" dependencies = [ "bitflags", - "thiserror", + "rustc_version", ] [[package]] @@ -2963,17 +3059,17 @@ dependencies = [ [[package]] name = "parquet" -version = "31.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b4ee1ffc0778395c9783a5c74f2cad2fb1a128ade95a965212d31b7b13e3d45" +checksum = "7ac135ecf63ebb5f53dda0921b0b76d6048b3ef631a5f4760b9e8f863ff00cfa" dependencies = [ "ahash 0.8.3", - "arrow-array", - "arrow-buffer", + "arrow-array 34.0.0", + "arrow-buffer 34.0.0", "arrow-cast", - "arrow-data", + "arrow-data 34.0.0", "arrow-ipc", - "arrow-schema", + "arrow-schema 34.0.0", "arrow-select", "base64 0.21.0", "brotli", @@ -2991,7 +3087,16 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.12.3+zstd.1.5.2", +] + +[[package]] +name = "parse-zoneinfo" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +dependencies = [ + "regex", ] [[package]] @@ -3005,7 +3110,10 @@ dependencies = [ "actix-web-prometheus", "actix-web-static-files", "anyhow", - "arrow-schema", + "arrow-array 34.0.0", + "arrow-ipc", + "arrow-json", + "arrow-schema 34.0.0", "async-trait", "aws-sdk-s3", "aws-smithy-async", @@ -3035,6 +3143,7 @@ dependencies = [ "num_cpus", "object_store", "once_cell", + "parquet", "prometheus", "pyroscope", "pyroscope_pprofrs", @@ -3091,6 +3200,54 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "petgraph" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]] +name = "phf" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56ac890c5e3ca598bbdeaa99964edb5b0258a583a9eb6ef4e89fc85d9224770" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.0.12" @@ -3775,6 +3932,12 @@ dependencies = [ "outref", ] +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "slab" version = "0.4.7" @@ -3836,9 +3999,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "sqlparser" -version = "0.30.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b" +checksum = "0366f270dbabb5cc2e4c88427dc4c08bba144f81e32fbd459a013f26a4d16aa0" dependencies = [ "log", "sqlparser_derive", @@ -4789,13 +4952,32 @@ dependencies = [ "flate2", ] +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe 5.0.2+zstd.1.5.2", +] + [[package]] name = "zstd" version = "0.12.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" dependencies = [ - "zstd-safe", + "zstd-safe 6.0.4+zstd.1.5.4", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index ffee1de57..0d79fa48c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,7 +14,10 @@ actix-files = "0.6" actix-web-prometheus = { version = "0.1" } prometheus = { version = "0.13", features = ["process"] } anyhow = { version = "1.0", features = ["backtrace"] } -arrow-schema = { version = "31.0", features = ["serde"] } +arrow-schema = { version = "34.0.0", features = ["serde"] } +arrow-array = { version = "34.0.0" } +arrow-json = "34.0.0" +arrow-ipc = "34.0.0" async-trait = "0.1" aws-sdk-s3 = "0.24" aws-smithy-async = { version = "0.54", features = ["rt-tokio"] } @@ -32,7 +35,7 @@ clap = { version = "4.1", default-features = false, features = [ "error-context", ] } crossterm = "0.26" -datafusion = "17" +datafusion = "21.0.0" object_store = { version = "0.5", features = ["aws"] } derive_more = "0.99" env_logger = "0.10" @@ -64,7 +67,7 @@ tokio = { version = "1.25", default-features = false, features = [ ] } clokwerk = "0.4" actix-web-static-files = "4.0" -static-files = "0.2" +static-files = "0.2" ulid = { version = "1.0", features = ["serde"] } hex = "0.4" itertools = "0.10" @@ -72,6 +75,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] } xz2 = { version = "*", features=["static"] } bzip2 = { version = "*", features=["static"] } once_cell = "1.17.1" +parquet = "34.0.0" pyroscope = { version = "0.5.3", optional = true } pyroscope_pprofrs = { version = "0.2", optional = true } uptime_lib = "0.2.2" diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index e89d15f36..e62269c21 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -16,7 +16,11 @@ * */ +use arrow_array::cast::as_string_array; +use arrow_array::RecordBatch; +use arrow_schema::DataType; use async_trait::async_trait; +use datafusion::arrow::compute::kernels::cast; use datafusion::arrow::datatypes::Schema; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -33,21 +37,21 @@ use crate::{storage, utils}; pub use self::rule::Rule; use self::target::Target; -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct Alerts { pub version: AlertVerison, pub alerts: Vec, } -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] pub enum AlertVerison { #[default] V1, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct Alert { #[serde(default = "crate::utils::uid::gen")] @@ -60,22 +64,29 @@ pub struct Alert { } impl Alert { - pub fn check_alert(&self, stream_name: String, event_json: &serde_json::Value) { - let resolves = self.rule.resolves(event_json); + pub fn check_alert(&self, stream_name: String, events: RecordBatch) { + let resolves = self.rule.resolves(events.clone()); - match resolves { - AlertState::Listening | AlertState::Firing => (), - alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => { - let context = self.get_context(stream_name, alert_state, &self.rule, event_json); - ALERTS_STATES - .with_label_values(&[ - context.stream.as_str(), - context.alert_info.alert_name.as_str(), - context.alert_info.alert_state.to_string().as_str(), - ]) - .inc(); - for target in &self.targets { - target.call(context.clone()); + for (index, state) in resolves.into_iter().enumerate() { + match state { + AlertState::Listening | AlertState::Firing => (), + alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => { + let context = self.get_context( + stream_name.clone(), + alert_state, + &self.rule, + events.slice(index, 1), + ); + ALERTS_STATES + .with_label_values(&[ + context.stream.as_str(), + context.alert_info.alert_name.as_str(), + context.alert_info.alert_state.to_string().as_str(), + ]) + .inc(); + for target in &self.targets { + target.call(context.clone()); + } } } } @@ -86,7 +97,7 @@ impl Alert { stream_name: String, alert_state: AlertState, rule: &Rule, - event_json: &serde_json::Value, + event_row: RecordBatch, ) -> Context { let deployment_instance = format!( "{}://{}", @@ -104,7 +115,7 @@ impl Alert { stream_name, AlertInfo::new( self.name.clone(), - self.message.get(event_json), + self.message.get(event_row), rule.trigger_reason(), alert_state, ), @@ -144,9 +155,12 @@ impl Message { } // returns the message with the column name replaced with the value of the column - fn get(&self, event_json: &serde_json::Value) -> String { + fn get(&self, event: RecordBatch) -> String { if let Some(column) = self.extract_column_name() { - if let Some(value) = event_json.get(column) { + if let Some(value) = event.column_by_name(column) { + let arr = cast(value, &DataType::Utf8).unwrap(); + let value = as_string_array(&arr).value(0); + return self .message .replace(&format!("{{{column}}}"), value.to_string().as_str()); diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index 2eb5722a0..cd8c614b5 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -16,8 +16,8 @@ * */ +use arrow_array::{cast::as_string_array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Schema}; -use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicU32, Ordering}; use self::base::{ @@ -27,7 +27,7 @@ use self::base::{ use super::AlertState; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(tag = "type", content = "config")] #[serde(rename_all = "camelCase")] pub enum Rule { @@ -35,7 +35,7 @@ pub enum Rule { } impl Rule { - pub fn resolves(&self, event: &serde_json::Value) -> AlertState { + pub fn resolves(&self, event: RecordBatch) -> Vec { match self { Rule::Column(rule) => rule.resolves(event), } @@ -54,7 +54,7 @@ impl Rule { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(untagged)] pub enum ColumnRule { ConsecutiveNumeric(ConsecutiveNumericRule), @@ -62,7 +62,7 @@ pub enum ColumnRule { } impl ColumnRule { - fn resolves(&self, event: &serde_json::Value) -> AlertState { + fn resolves(&self, event: RecordBatch) -> Vec { match self { Self::ConsecutiveNumeric(rule) => rule.resolves(event), Self::ConsecutiveString(rule) => rule.resolves(event), @@ -120,7 +120,6 @@ impl ColumnRule { NumericOperator::GreaterThanEquals => "greater than or equal to", NumericOperator::LessThan => "less than", NumericOperator::LessThanEquals => "less than or equal to", - NumericOperator::Regex => "matches regex", }, value, repeats @@ -154,7 +153,7 @@ impl ColumnRule { // Rules for alerts -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct ConsecutiveNumericRule { #[serde(flatten)] @@ -164,20 +163,27 @@ pub struct ConsecutiveNumericRule { } impl ConsecutiveNumericRule { - fn resolves(&self, event: &serde_json::Value) -> AlertState { - if let Some(resolved) = self.base_rule.resolves(event) { - if resolved { - self.state.update_and_fetch_state() - } else { - self.state.fetch_state() - } - } else { - self.state.existing_state() - } + fn resolves(&self, event: RecordBatch) -> Vec { + let Some(column) = event.column_by_name(&self.base_rule.column) else { + return Vec::new(); + }; + + let base_matches = self.base_rule.resolves(column); + + base_matches + .into_iter() + .map(|matches| { + if matches { + self.state.update_and_fetch_state() + } else { + self.state.fetch_state() + } + }) + .collect() } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct ConsecutiveStringRule { #[serde(flatten)] @@ -187,16 +193,23 @@ pub struct ConsecutiveStringRule { } impl ConsecutiveStringRule { - fn resolves(&self, event: &serde_json::Value) -> AlertState { - if let Some(resolved) = self.base_rule.resolves(event) { - if resolved { - self.state.update_and_fetch_state() - } else { - self.state.fetch_state() - } - } else { - self.state.existing_state() - } + fn resolves(&self, event: RecordBatch) -> Vec { + let Some(column) = event.column_by_name(&self.base_rule.column) else { + return Vec::new(); + }; + + let base_matches = self.base_rule.resolves(as_string_array(column)); + + base_matches + .into_iter() + .map(|matches| { + if matches { + self.state.update_and_fetch_state() + } else { + self.state.fetch_state() + } + }) + .collect() } } @@ -204,7 +217,7 @@ fn one() -> u32 { 1 } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ConsecutiveRepeatState { #[serde(default = "one")] pub repeats: u32, @@ -221,15 +234,6 @@ impl ConsecutiveRepeatState { self._fetch_state(false) } - fn existing_state(&self) -> AlertState { - let repeated = self.repeated.load(Ordering::Acquire); - if repeated >= self.repeats { - AlertState::Firing - } else { - AlertState::Listening - } - } - fn _fetch_state(&self, update: bool) -> AlertState { let mut repeated = self.repeated.load(Ordering::Acquire); let mut state = AlertState::Listening; @@ -294,11 +298,17 @@ mod tests { } pub mod base { + use arrow_array::{ + cast::as_primitive_array, + types::{Float64Type, Int64Type, UInt64Type}, + Array, ArrowPrimitiveType, PrimitiveArray, StringArray, + }; + use itertools::Itertools; + use self::ops::{NumericOperator, StringOperator}; use regex::Regex; - use serde::{Deserialize, Serialize}; - #[derive(Debug, Serialize, Deserialize)] + #[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct NumericRule { pub column: String, @@ -309,38 +319,50 @@ pub mod base { } impl NumericRule { - pub fn resolves(&self, event: &serde_json::Value) -> Option { - let number = match event.get(&self.column)? { - serde_json::Value::Number(number) => number, - _ => unreachable!("right rule is set for right column type"), - }; - - let res = match self.operator { - NumericOperator::EqualTo => number == &self.value, - NumericOperator::NotEqualTo => number != &self.value, - NumericOperator::GreaterThan => { - number.as_f64().unwrap() > self.value.as_f64().unwrap() - } - NumericOperator::GreaterThanEquals => { - number.as_f64().unwrap() >= self.value.as_f64().unwrap() - } - NumericOperator::LessThan => { - number.as_f64().unwrap() < self.value.as_f64().unwrap() - } - NumericOperator::LessThanEquals => { - number.as_f64().unwrap() <= self.value.as_f64().unwrap() - } - NumericOperator::Regex => { - let re: Regex = regex::Regex::new(&self.value.to_string()).unwrap(); - re.is_match(&number.to_string()) - } - }; + pub fn resolves(&self, event: &dyn Array) -> Vec { + let datatype = event.data_type(); + match datatype { + arrow_schema::DataType::Int64 => Self::eval_op( + self.operator, + self.value.as_i64().unwrap(), + as_primitive_array::(event), + ), + arrow_schema::DataType::UInt64 => Self::eval_op( + self.operator, + self.value.as_u64().unwrap(), + as_primitive_array::(event), + ), + arrow_schema::DataType::Float64 => Self::eval_op( + self.operator, + self.value.as_f64().unwrap(), + as_primitive_array::(event), + ), + _ => unreachable!(), + } + } - Some(res) + fn eval_op( + op: NumericOperator, + value: T::Native, + arr: &PrimitiveArray, + ) -> Vec { + arr.iter() + .map(|number| { + let Some(number) = number else { return false }; + match op { + NumericOperator::EqualTo => number == value, + NumericOperator::NotEqualTo => number != value, + NumericOperator::GreaterThan => number > value, + NumericOperator::GreaterThanEquals => number >= value, + NumericOperator::LessThan => number < value, + NumericOperator::LessThanEquals => number <= value, + } + }) + .collect() } } - #[derive(Debug, Serialize, Deserialize)] + #[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct StringRule { pub column: String, @@ -351,48 +373,54 @@ pub mod base { } impl StringRule { - pub fn resolves(&self, event: &serde_json::Value) -> Option { - let string = match event.get(&self.column)? { - serde_json::Value::String(s) => s, - _ => unreachable!("right rule is set for right column type"), - }; - - let res = if self.ignore_case.unwrap_or_default() { - match self.operator { - StringOperator::Exact => string.eq_ignore_ascii_case(&self.value), - StringOperator::NotExact => !string.eq_ignore_ascii_case(&self.value), + pub fn resolves(&self, event: &StringArray) -> Vec { + event + .iter() + .map(|string| { + let Some(string) = string else { return false }; + Self::matches( + self.operator, + string, + &self.value, + self.ignore_case.unwrap_or_default(), + ) + }) + .collect_vec() + } + + fn matches(op: StringOperator, string: &str, value: &str, ignore_case: bool) -> bool { + if ignore_case { + match op { + StringOperator::Exact => string.eq_ignore_ascii_case(value), + StringOperator::NotExact => !string.eq_ignore_ascii_case(value), StringOperator::Contains => string .to_ascii_lowercase() - .contains(&self.value.to_ascii_lowercase()), + .contains(&value.to_ascii_lowercase()), StringOperator::NotContains => !string .to_ascii_lowercase() - .contains(&self.value.to_ascii_lowercase()), + .contains(&value.to_ascii_lowercase()), StringOperator::Regex => { - let re: Regex = regex::Regex::new(&self.value).unwrap(); + let re: Regex = regex::Regex::new(value).unwrap(); re.is_match(string) } } } else { - match self.operator { - StringOperator::Exact => string.eq(&self.value), - StringOperator::NotExact => !string.eq(&self.value), - StringOperator::Contains => string.contains(&self.value), - StringOperator::NotContains => !string.contains(&self.value), + match op { + StringOperator::Exact => string.eq(value), + StringOperator::NotExact => !string.eq(value), + StringOperator::Contains => string.contains(value), + StringOperator::NotContains => !string.contains(value), StringOperator::Regex => { - let re: Regex = regex::Regex::new(&self.value).unwrap(); + let re: Regex = regex::Regex::new(value).unwrap(); re.is_match(string) } } - }; - - Some(res) + } } } pub mod ops { - use serde::{Deserialize, Serialize}; - - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub enum NumericOperator { #[serde(alias = "=")] @@ -407,8 +435,6 @@ pub mod base { LessThan, #[serde(alias = "<=")] LessThanEquals, - #[serde(alias = "~")] - Regex, } impl Default for NumericOperator { @@ -417,7 +443,7 @@ pub mod base { } } - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub enum StringOperator { #[serde(alias = "=")] diff --git a/server/src/alerts/target.rs b/server/src/alerts/target.rs index 8e67f85c1..a3672d8df 100644 --- a/server/src/alerts/target.rs +++ b/server/src/alerts/target.rs @@ -28,13 +28,12 @@ use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; use reqwest::ClientBuilder; -use serde::{Deserialize, Serialize}; use crate::utils::json; use super::{AlertState, CallableTarget, Context}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] #[serde(untagged)] pub enum Retry { @@ -42,7 +41,7 @@ pub enum Retry { Finite(usize), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] #[serde(try_from = "TargetVerifier")] pub struct Target { @@ -143,13 +142,13 @@ fn call_target(target: TargetType, context: Context) { actix_web::rt::spawn(async move { target.call(&context).await }); } -#[derive(Debug, Deserialize)] +#[derive(Debug, serde::Deserialize)] pub struct RepeatVerifier { interval: Option, times: Option, } -#[derive(Debug, Deserialize)] +#[derive(Debug, serde::Deserialize)] #[serde(rename_all = "lowercase")] pub struct TargetVerifier { #[serde(flatten)] @@ -192,7 +191,7 @@ impl TryFrom for Target { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] #[serde(tag = "type")] #[serde(deny_unknown_fields)] @@ -217,7 +216,7 @@ fn default_client_builder() -> ClientBuilder { ClientBuilder::new() } -#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct SlackWebHook { endpoint: String, } @@ -245,7 +244,7 @@ impl CallableTarget for SlackWebHook { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "snake_case")] pub struct OtherWebHook { endpoint: String, @@ -283,7 +282,7 @@ impl CallableTarget for OtherWebHook { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct AlertManager { endpoint: String, #[serde(default)] @@ -362,7 +361,7 @@ impl CallableTarget for AlertManager { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Timeout { #[serde(with = "humantime_serde")] pub interval: Duration, @@ -388,7 +387,7 @@ pub struct TimeoutState { pub awaiting_resolve: bool, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Auth { username: String, password: String, diff --git a/server/src/event.rs b/server/src/event.rs index 701f460e3..6617bbda3 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -17,67 +17,49 @@ * */ +pub mod format; mod writer; -use chrono::Utc; -use datafusion::arrow::array::{Array, TimestampMillisecondArray}; -use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; -use datafusion::arrow::error::ArrowError; -use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions}; -use datafusion::arrow::record_batch::RecordBatch; -use serde_json::Value; +use arrow_array::RecordBatch; +use arrow_schema::{Field, Schema}; use std::collections::HashMap; use std::ops::DerefMut; use std::sync::Arc; use crate::metadata; -use crate::metadata::LOCK_EXPECT; -use crate::option::CONFIG; use self::error::EventError; pub use self::writer::STREAM_WRITERS; const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; +const DEFAULT_TAGS_KEY: &str = "p_tags"; +const DEFAULT_METADATA_KEY: &str = "p_metadata"; #[derive(Clone)] pub struct Event { - pub body: Value, pub stream_name: String, - pub schema_key: String, + pub rb: RecordBatch, + pub origin_format: &'static str, + pub origin_size: u64, } // Events holds the schema related to a each event for a single log stream impl Event { pub async fn process(self) -> Result<(), EventError> { - let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name, &self.schema_key)?; - if let Some(schema) = stream_schema { - // validate schema before processing the event - let Ok(mut event) = self.get_record(Arc::clone(&schema)) else { - return Err(EventError::SchemaMismatch); - }; + let key = get_schema_key(&self.rb.schema().fields); - let rows = event.num_rows(); - let timestamp_array = Arc::new(get_timestamp_array(rows)); - event = replace(schema, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); + if self.is_first_event(metadata::STREAM_INFO.schema(&self.stream_name)?.as_ref()) { + commit_schema(&self.stream_name, self.rb.schema())?; + } - self.process_event(&event)?; - } else { - // if stream schema is none then it is first event, - // process first event and store schema in obect store - let schema = add_default_timestamp_field(self.infer_schema()?)?; - let schema_ref = Arc::new(schema.clone()); - let event = self.get_record(schema_ref.clone())?; - let timestamp_array = Arc::new(get_timestamp_array(event.num_rows())); - let event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array); - self.process_first_event(&event, schema)?; - }; + self.process_event(&key)?; metadata::STREAM_INFO.update_stats( &self.stream_name, - serde_json::to_vec(&self.body) - .map(|v| std::mem::size_of_val(v.as_slice())) - .unwrap_or(0) as u64, + self.origin_format, + self.origin_size, + self.rb.num_rows() as u64, )?; if let Err(e) = metadata::STREAM_INFO.check_alerts(&self).await { @@ -87,301 +69,92 @@ impl Event { Ok(()) } - // This is called when the first event of a log stream is received. The first event is - // special because we parse this event to generate the schema for the log stream. This - // schema is then enforced on rest of the events sent to this log stream. - fn process_first_event(&self, event: &RecordBatch, schema: Schema) -> Result<(), EventError> { - // note for functions _schema_with_map and _set_schema_with_map, - // these are to be called while holding a write lock specifically. - // this guarantees two things - // - no other metadata operation can happen in between - // - map always have an entry for this stream - - let stream_name = &self.stream_name; - let schema_key = &self.schema_key; - - let old = metadata::STREAM_INFO.merged_schema(stream_name)?; - if Schema::try_merge(vec![old, schema.clone()]).is_err() { - return Err(EventError::SchemaMismatch); - }; + fn is_first_event(&self, stream_schema: &Schema) -> bool { + let mut stream_fields = stream_schema.fields().iter(); + let event_schema = self.rb.schema(); + let event_fields = event_schema.fields(); + + for field in event_fields { + loop { + let Some(stream_field) = stream_fields.next() else { return true }; + if stream_field.name() == field.name() { + break; + } else { + continue; + } + } + } - commit_schema(stream_name, schema_key, Arc::new(schema))?; - self.process_event(event) + false } // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. - fn process_event(&self, rb: &RecordBatch) -> Result<(), EventError> { - STREAM_WRITERS::append_to_local(&self.stream_name, &self.schema_key, rb)?; + fn process_event(&self, schema_key: &str) -> Result<(), EventError> { + STREAM_WRITERS::append_to_local(&self.stream_name, schema_key, &self.rb)?; Ok(()) } - - // inferSchema is a constructor to Schema - // returns raw arrow schema type and arrow schema to string type. - fn infer_schema(&self) -> Result { - let iter = std::iter::once(Ok(self.body.clone())); - infer_json_schema_from_iterator(iter) - } - - fn get_record(&self, schema: Arc) -> Result { - let mut iter = std::iter::once(Ok(self.body.clone())); - if fields_mismatch(&schema, &self.body) { - return Err(EventError::SchemaMismatch); - } - let record = Decoder::new(schema, DecoderOptions::new()).next_batch(&mut iter)?; - - record.ok_or(EventError::MissingRecord) - } } -fn add_default_timestamp_field(schema: Schema) -> Result { - let schema = Schema::try_merge(vec![ - Schema::new(vec![Field::new( - DEFAULT_TIMESTAMP_KEY, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )]), - schema, - ])?; - - Ok(schema) -} - -pub fn get_schema_key(body: &Value) -> String { - let mut list_of_fields: Vec<_> = body.as_object().unwrap().keys().collect(); - list_of_fields.sort(); +pub fn get_schema_key(fields: &Vec) -> String { + // Fields must be sorted let mut hasher = xxhash_rust::xxh3::Xxh3::new(); - for field in list_of_fields { - hasher.update(field.as_bytes()) + for field in fields { + hasher.update(field.name().as_bytes()) } let hash = hasher.digest(); format!("{hash:x}") } -fn fields_mismatch(schema: &Schema, body: &Value) -> bool { - for (name, val) in body.as_object().expect("body is of object variant") { - let Ok(field) = schema.field_with_name(name) else { return true }; - if !valid_type(field.data_type(), val) { - return true; - } - } - false -} - -fn valid_type(data_type: &DataType, value: &Value) -> bool { - match data_type { - DataType::Boolean => value.is_boolean(), - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(), - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(), - DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(), - DataType::Utf8 => value.is_string(), - DataType::List(field) => { - let data_type = field.data_type(); - if let Value::Array(arr) = value { - for elem in arr { - if !valid_type(data_type, elem) { - return false; - } - } - } - true - } - DataType::Struct(fields) => { - if let Value::Object(val) = value { - for (key, value) in val { - let field = (0..fields.len()) - .find(|idx| fields[*idx].name() == key) - .map(|idx| &fields[idx]); - - if let Some(field) = field { - if !valid_type(field.data_type(), value) { - return false; - } - } else { - return false; - } - } - true - } else { - false - } - } - _ => unreachable!(), - } -} +pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), EventError> { + let mut stream_metadata = metadata::STREAM_INFO.write().expect("lock poisoned"); -fn commit_schema( - stream_name: &str, - schema_key: &str, - schema: Arc, -) -> Result<(), EventError> { - // note for methods .get_unchecked and .set_unchecked, - // these are to be called while holding a write lock specifically. - // this guarantees two things - // - no other metadata operation can happen in between - // - map always have an entry for this stream - - let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT); - // if the metadata is not none after acquiring lock - // then some other thread has already completed this function. - if stream_metadata - .get_unchecked(stream_name, schema_key) - .is_some() - { - // drop the lock - drop(stream_metadata); - // Nothing to do - Ok(()) - } else { - // set to map - stream_metadata.set_unchecked(stream_name, schema_key, schema); - // serialize map - let schema_map = serde_json::to_string( - &stream_metadata - .get(stream_name) - .expect("map has entry for this stream name") - .schema, - ) - .expect("map of schemas is serializable"); - // try to put to storage - let storage = CONFIG.storage().get_object_store(); - - let _stream_name = stream_name.to_owned(); - let handle = std::thread::spawn(move || { - let rt = actix_web::rt::System::new(); - rt.block_on(storage.put_schema_map(&_stream_name, &schema_map)) - }); - - let res = match handle.join() { - Ok(res) => res.map_err(EventError::ObjectStorage), - Err(_) => { - log::error!("commit schema thread panicked"); - Err(EventError::InternalError) - } - }; - // revert if err - if let Err(ref err) = res { - stream_metadata.remove_unchecked(stream_name, schema_key); - log::error!( - "Failed to commit schema during new event ingestion: {}", - err - ) - } - - res - } -} - -fn replace( - schema: Arc, - batch: RecordBatch, - column: &str, - arr: Arc, -) -> RecordBatch { - let (index, _) = schema.column_with_name(column).unwrap(); - let mut arrays = batch.columns().to_vec(); - arrays[index] = arr; - - RecordBatch::try_new(schema, arrays).unwrap() -} + let schema = Schema::try_merge(vec![ + schema.as_ref().clone(), + stream_metadata.get_unchecked(stream_name).as_ref().clone(), + ]) + .unwrap(); -fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { - let time = Utc::now(); - TimestampMillisecondArray::from_value(time.timestamp_millis(), size) + stream_metadata.set_unchecked(stream_name, Arc::new(schema)); + Ok(()) } trait UncheckedOp: DerefMut> { - fn get_unchecked(&self, stream_name: &str, schema_key: &str) -> Option> { - self.get(stream_name) + fn get_unchecked(&self, stream_name: &str) -> Arc { + let schema = &self + .get(stream_name) .expect("map has entry for this stream name") - .schema - .get(schema_key) - .cloned() - } + .schema; - fn set_unchecked(&mut self, stream_name: &str, schema_key: &str, schema: Arc) { - self.get_mut(stream_name) - .expect("map has entry for this stream name") - .schema - .insert(schema_key.to_string(), schema) - .is_some() - .then(|| panic!("collision")); + Arc::clone(schema) } - fn remove_unchecked(&mut self, stream_name: &str, schema_key: &str) { + fn set_unchecked(&mut self, stream_name: &str, schema: Arc) { self.get_mut(stream_name) .expect("map has entry for this stream name") - .schema - .remove(schema_key); + .schema = schema } } impl>> UncheckedOp for T {} pub mod error { + use arrow_schema::ArrowError; + use crate::metadata::error::stream_info::MetadataError; use crate::storage::ObjectStorageError; - use datafusion::arrow::error::ArrowError; use super::writer::errors::StreamWriterError; #[derive(Debug, thiserror::Error)] pub enum EventError { - #[error("Missing Record from event body")] - MissingRecord, #[error("Stream Writer Failed: {0}")] StreamWriter(#[from] StreamWriterError), #[error("Metadata Error: {0}")] Metadata(#[from] MetadataError), #[error("Stream Writer Failed: {0}")] Arrow(#[from] ArrowError), - #[error("Schema Mismatch")] - SchemaMismatch, #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), - #[error("Internal Error")] - InternalError, - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use datafusion::arrow::{ - array::{Array, Int32Array}, - record_batch::RecordBatch, - }; - - use crate::event::replace; - - #[test] - fn check_replace() { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Int32, false), - ]); - - let schema_ref = Arc::new(schema); - - let rb = RecordBatch::try_new( - schema_ref.clone(), - vec![ - Arc::new(Int32Array::from_value(0, 3)), - Arc::new(Int32Array::from_value(0, 3)), - Arc::new(Int32Array::from_value(0, 3)), - ], - ) - .unwrap(); - - let arr: Arc = Arc::new(Int32Array::from_value(0, 3)); - - let new_rb = replace(schema_ref.clone(), rb, "c", arr); - - assert_eq!(new_rb.schema(), schema_ref); - assert_eq!(new_rb.num_columns(), 3); - assert_eq!(new_rb.num_rows(), 3) } } diff --git a/server/src/event/format.rs b/server/src/event/format.rs new file mode 100644 index 000000000..630ac455d --- /dev/null +++ b/server/src/event/format.rs @@ -0,0 +1,116 @@ +#![allow(dead_code)] + +use std::sync::Arc; + +use anyhow::{anyhow, Error as AnyError}; +use arrow_array::{RecordBatch, StringArray, TimestampMillisecondArray}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use chrono::Utc; + +use crate::utils; + +use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY}; + +pub mod json; + +type Tags = String; +type Metadata = String; + +pub trait EventFormat: Sized { + type Data; + fn to_data(self) -> Result<(Self::Data, Schema, Tags, Metadata), AnyError>; + fn decode(data: Self::Data, schema: Arc) -> Result; + fn into_recordbatch(self) -> Result { + let (data, mut schema, tags, metadata) = self.to_data()?; + + match tags_index(&schema) { + Ok(_) => return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)), + Err(index) => { + schema + .fields + .insert(index, Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)); + } + }; + + match metadata_index(&schema) { + Ok(_) => { + return Err(anyhow!( + "field {} is a reserved field", + DEFAULT_METADATA_KEY + )) + } + Err(index) => { + schema.fields.insert( + index, + Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true), + ); + } + }; + + match timestamp_index(&schema) { + Ok(_) => { + return Err(anyhow!( + "field {} is a reserved field", + DEFAULT_TIMESTAMP_KEY + )) + } + Err(index) => { + schema.fields.insert( + index, + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ); + } + }; + + let schema_ref = Arc::new(schema); + let rb = Self::decode(data, Arc::clone(&schema_ref))?; + let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); + let metadata_arr = + StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows())); + let timestamp_array = get_timestamp_array(rb.num_rows()); + + let rb = utils::arrow::replace_columns( + Arc::clone(&schema_ref), + rb, + &[ + timestamp_index(&schema_ref).expect("timestamp field exists"), + tags_index(&schema_ref).expect("tags field exists"), + metadata_index(&schema_ref).expect("metadata field exists"), + ], + &[ + Arc::new(timestamp_array), + Arc::new(tags_arr), + Arc::new(metadata_arr), + ], + ); + + Ok(rb) + } +} + +fn tags_index(schema: &Schema) -> Result { + schema + .fields + .binary_search_by_key(&DEFAULT_TAGS_KEY, |field| field.name()) +} + +fn metadata_index(schema: &Schema) -> Result { + schema + .fields + .binary_search_by_key(&DEFAULT_METADATA_KEY, |field| field.name()) +} + +fn timestamp_index(schema: &Schema) -> Result { + schema + .fields + .binary_search_by_key(&DEFAULT_TIMESTAMP_KEY, |field| field.name()) +} + +fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { + let time = Utc::now(); + TimestampMillisecondArray::from_value(time.timestamp_millis(), size) +} diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs new file mode 100644 index 000000000..96f9c20f2 --- /dev/null +++ b/server/src/event/format/json.rs @@ -0,0 +1,138 @@ +#![allow(deprecated)] + +use anyhow::anyhow; +use arrow_array::RecordBatch; +use arrow_json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions}; +use arrow_schema::{Field, Schema}; +use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; +use serde_json::Value; +use std::sync::Arc; + +use super::EventFormat; +use crate::{metadata::STREAM_INFO, utils::json::flatten_json_body}; + +pub struct Event { + pub stream_name: String, + pub data: Value, + pub tags: String, + pub metadata: String, +} + +impl EventFormat for Event { + type Data = Vec; + + fn to_data(self) -> Result<(Self::Data, Schema, String, String), anyhow::Error> { + let data = flatten_json_body(self.data)?; + + let stream_schema = get_stream_schema(&self.stream_name); + + let value_arr = match data { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => unreachable!("flatten would have failed beforehand"), + }; + + let fields = + collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); + + let schema = match derive_sub_schema(stream_schema, fields) { + Ok(schema) => schema, + Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { + Ok(mut infer_schema) => { + infer_schema + .fields + .sort_by(|field1, field2| Ord::cmp(field1.name(), field2.name())); + + if let Err(err) = Schema::try_merge(vec![ + get_stream_schema(&self.stream_name), + infer_schema.clone(), + ]) { + return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err)); + } + infer_schema + } + Err(err) => { + return Err(anyhow!( + "Could not infer schema for this event due to err {:?}", + err + )) + } + }, + }; + + Ok((value_arr, schema, self.tags, self.metadata)) + } + + fn decode(data: Self::Data, schema: Arc) -> Result { + let array_capacity = round_upto_multiple_of_64(data.len()); + let value_iter: &mut (dyn Iterator) = &mut data.into_iter(); + + let reader = Decoder::new( + schema, + DecoderOptions::new().with_batch_size(array_capacity), + ); + match reader.next_batch(&mut value_iter.map(Ok)) { + Ok(Some(recordbatch)) => Ok(recordbatch), + Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)), + Ok(None) => unreachable!("all records are added to one rb"), + } + } +} + +fn get_stream_schema(stream_name: &str) -> Schema { + STREAM_INFO.schema(stream_name).unwrap().as_ref().clone() +} + +// invariants for this to work. +// All fields in existing schema and fields in event are sorted my name lexographically +fn derive_sub_schema(schema: arrow_schema::Schema, fields: Vec<&str>) -> Result { + let fields = derive_subset(schema.fields, fields)?; + Ok(Schema::new(fields)) +} + +fn derive_subset(superset: Vec, subset: Vec<&str>) -> Result, ()> { + let mut superset_idx = 0; + let mut subset_idx = 0; + let mut subset_schema = Vec::with_capacity(subset.len()); + + while superset_idx < superset.len() && subset_idx < subset.len() { + let field = superset[superset_idx].clone(); + let key = subset[subset_idx]; + if field.name() == key { + subset_schema.push(field); + superset_idx += 1; + subset_idx += 1; + } else if field.name().as_str() < key { + superset_idx += 1; + } else { + return Err(()); + } + } + + // error if subset is not exhausted + if subset_idx < subset.len() { + return Err(()); + } + + Ok(subset_schema) +} + +// Must be in sorted order +fn collect_keys<'a>(values: impl Iterator) -> Result, ()> { + let mut sorted_keys = Vec::new(); + for value in values { + if let Some(obj) = value.as_object() { + for key in obj.keys() { + match sorted_keys.binary_search(&key.as_str()) { + Ok(_) => (), + Err(pos) => { + sorted_keys.insert(pos, key.as_str()); + } + } + } + } else { + return Err(()); + } + } + Ok(sorted_keys) +} diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 493b0b971..bada41f55 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -17,7 +17,8 @@ * */ -use datafusion::arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch}; +use arrow_array::RecordBatch; +use arrow_ipc::writer::StreamWriter; use lazy_static::lazy_static; use std::borrow::Borrow; use std::collections::HashMap; @@ -188,7 +189,7 @@ fn init_new_stream_writer_file( } pub mod errors { - use datafusion::arrow::error::ArrowError; + use arrow_schema::ArrowError; #[derive(Debug, thiserror::Error)] pub enum StreamWriterError { diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index fe3be656b..982d80e08 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -133,7 +133,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { .route(web::post().to(ingest::post_event)) // DELETE "/logstream/{logstream}" ==> Delete log stream .route(web::delete().to(logstream::delete)) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) .service( web::resource("/alert") @@ -167,7 +167,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { .service( web::resource("/ingest") .route(web::post().to(ingest::ingest)) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command .service(web::resource("/liveness").route(web::get().to(health_check::liveness))) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 669abfac2..81fc6150f 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -17,20 +17,21 @@ */ use actix_web::http::header::ContentType; -use actix_web::{web, HttpRequest, HttpResponse}; +use actix_web::{HttpRequest, HttpResponse}; +use bytes::Bytes; use http::StatusCode; use serde_json::Value; -use crate::event; use crate::event::error::EventError; +use crate::event::format::EventFormat; +use crate::event::{self, format}; use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; -use crate::utils::json::{flatten_json_body, merge}; -pub async fn ingest( - req: HttpRequest, - body: web::Json, -) -> Result { +// Handler for POST /api/v1/ingest +// ingests events by extacting stream name from header +// creates if stream does not exist +pub async fn ingest(req: HttpRequest, body: Bytes) -> Result { if let Some((_, stream_name)) = req .headers() .iter() @@ -50,75 +51,57 @@ pub async fn ingest( // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist -pub async fn post_event( - req: HttpRequest, - body: web::Json, -) -> Result { +pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); push_logs(stream_name, req, body).await?; Ok(HttpResponse::Ok().finish()) } -async fn push_logs( - stream_name: String, - req: HttpRequest, - body: web::Json, -) -> Result<(), PostError> { - let tags_n_metadata = [ - ( - "p_tags".to_string(), - Value::String(collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?), - ), - ( - "p_metadata".to_string(), - Value::String(collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?), - ), - ]; +async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> { + let (size, rb) = into_event_batch(req, body, stream_name.clone())?; - match body.0 { - Value::Array(array) => { - for mut body in array { - merge(&mut body, tags_n_metadata.clone().into_iter()); - let body = flatten_json_body(body).map_err(|_| PostError::FlattenError)?; - let schema_key = event::get_schema_key(&body); - - let event = event::Event { - body, - stream_name: stream_name.clone(), - schema_key, - }; - - event.process().await?; - } - } - mut body @ Value::Object(_) => { - merge(&mut body, tags_n_metadata.into_iter()); - let body = flatten_json_body(body).map_err(|_| PostError::FlattenError)?; - let schema_key = event::get_schema_key(&body); - let event = event::Event { - body, - stream_name, - schema_key, - }; - - event.process().await?; - } - _ => return Err(PostError::Invalid), + event::Event { + rb, + stream_name, + origin_format: "json", + origin_size: size as u64, } + .process() + .await?; Ok(()) } +// This function is decoupled from handler itself for testing purpose +fn into_event_batch( + req: HttpRequest, + body: Bytes, + stream_name: String, +) -> Result<(usize, arrow_array::RecordBatch), PostError> { + let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; + let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; + let size = body.len(); + let body: Value = serde_json::from_slice(&body)?; + let event = format::json::Event { + stream_name, + data: body, + tags, + metadata, + }; + let rb = event.into_recordbatch()?; + Ok((size, rb)) +} + #[derive(Debug, thiserror::Error)] pub enum PostError { + #[error("Could not deserialize into JSON object, {0}")] + SerdeError(#[from] serde_json::Error), #[error("Header Error: {0}")] Header(#[from] ParseHeaderError), #[error("Event Error: {0}")] Event(#[from] EventError), - #[error("Invalid Request")] - Invalid, - #[error("failed to flatten the json object")] - FlattenError, + #[error("Invalid Request: {0}")] + Invalid(#[from] anyhow::Error), #[error("Failed to create stream due to {0}")] CreateStream(Box), } @@ -126,11 +109,11 @@ pub enum PostError { impl actix_web::ResponseError for PostError { fn status_code(&self) -> http::StatusCode { match self { + PostError::SerdeError(_) => StatusCode::BAD_REQUEST, PostError::Header(_) => StatusCode::BAD_REQUEST, PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::Invalid => StatusCode::BAD_REQUEST, + PostError::Invalid(_) => StatusCode::BAD_REQUEST, PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::FlattenError => StatusCode::BAD_REQUEST, } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index dd9e41d95..9274b2403 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -71,7 +71,9 @@ pub async fn list(_: HttpRequest) -> impl Responder { pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let schema = STREAM_INFO.merged_schema(&stream_name)?; + let schema = STREAM_INFO.schema(&stream_name)?; + let schema = serde_json::to_value(&schema).unwrap(); + Ok((web::Json(schema), StatusCode::OK)) } @@ -151,7 +153,7 @@ pub async fn put_alert( return Err(StreamError::UninitializedLogstream); } - let schema = STREAM_INFO.merged_schema(&stream_name)?; + let schema = STREAM_INFO.schema(&stream_name)?; for alert in &alerts.alerts { let column = alert.message.extract_column_name(); let is_valid = alert.message.valid(&schema, column); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 628e69481..13f7b4a33 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -16,7 +16,7 @@ * */ -use datafusion::arrow::datatypes::Schema; +use arrow_schema::Schema; use lazy_static::lazy_static; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -25,7 +25,7 @@ use crate::alerts::Alerts; use crate::event::Event; use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE}; use crate::stats::{Stats, StatsCounter}; -use crate::storage::ObjectStorage; +use crate::storage::{MergedRecordReader, ObjectStorage, StorageDir}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; @@ -37,13 +37,23 @@ lazy_static! { RwLock::new(HashMap::new()); } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct LogStreamMetadata { - pub schema: HashMap>, + pub schema: Arc, pub alerts: Alerts, pub stats: StatsCounter, } +impl Default for LogStreamMetadata { + fn default() -> Self { + Self { + schema: Arc::new(Schema::empty()), + alerts: Alerts::default(), + stats: StatsCounter::default(), + } + } +} + // It is very unlikely that panic will occur when dealing with metadata. pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding a lock"; @@ -64,7 +74,7 @@ impl STREAM_INFO { ))?; for alert in &meta.alerts.alerts { - alert.check_alert(event.stream_name.clone(), &event.body) + alert.check_alert(event.stream_name.clone(), event.rb.clone()) } Ok(()) @@ -76,46 +86,17 @@ impl STREAM_INFO { } pub fn stream_initialized(&self, stream_name: &str) -> Result { - Ok(!self.schema_map(stream_name)?.is_empty()) + Ok(!self.schema(stream_name)?.fields.is_empty()) } - pub fn schema( - &self, - stream_name: &str, - schema_key: &str, - ) -> Result>, MetadataError> { + pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); - let schemas = map + let schema = map .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| &metadata.schema)?; - Ok(schemas.get(schema_key).cloned()) - } - - pub fn schema_map( - &self, - stream_name: &str, - ) -> Result>, MetadataError> { - let map = self.read().expect(LOCK_EXPECT); - let schemas = map - .get(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.schema.clone())?; - - Ok(schemas) - } - - pub fn merged_schema(&self, stream_name: &str) -> Result { - let map = self.read().expect(LOCK_EXPECT); - let schemas = &map - .get(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))? - .schema; - let schema = Schema::try_merge(schemas.values().map(|schema| &**schema).cloned()) - .expect("mergeable schemas"); - - Ok(schema) + Ok(Arc::clone(schema)) } pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { @@ -148,9 +129,11 @@ impl STREAM_INFO { for stream in storage.list_streams().await? { let alerts = storage.get_alerts(&stream.name).await?; - let schema = storage.get_schema_map(&stream.name).await?; + let schema = storage.get_schema(&stream.name).await?; let stats = storage.get_stats(&stream.name).await?; + let schema = Arc::new(update_schema_from_staging(&stream.name, schema)); + let metadata = LogStreamMetadata { schema, alerts, @@ -173,19 +156,25 @@ impl STREAM_INFO { .collect() } - pub fn update_stats(&self, stream_name: &str, size: u64) -> Result<(), MetadataError> { + pub fn update_stats( + &self, + stream_name: &str, + origin: &'static str, + size: u64, + num_rows: u64, + ) -> Result<(), MetadataError> { let map = self.read().expect(LOCK_EXPECT); let stream = map .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; stream.stats.add_ingestion_size(size); - stream.stats.increase_event_by_one(); + stream.stats.increase_event_by_n(num_rows); EVENTS_INGESTED - .with_label_values(&[stream_name.clone(), "json"]) + .with_label_values(&[stream_name.clone(), origin]) .inc(); EVENTS_INGESTED_SIZE - .with_label_values(&[stream_name.clone(), "json"]) + .with_label_values(&[stream_name.clone(), origin]) .add(size as i64); Ok(()) @@ -200,6 +189,15 @@ impl STREAM_INFO { } } +fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Schema { + let staging_files = StorageDir::new(stream_name).arrow_files(); + let schema = MergedRecordReader::try_new(&staging_files) + .unwrap() + .merged_schema(); + + Schema::try_merge(vec![schema, current_schema]).unwrap() +} + pub mod error { pub mod stream_info { use crate::storage::ObjectStorageError; diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index 10a701435..18a301ec3 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; -use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; use itertools::Itertools; pub(super) fn v1_v2(schema: Option) -> anyhow::Result> { diff --git a/server/src/option.rs b/server/src/option.rs index ab1687059..717a3ea7e 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -431,16 +431,16 @@ pub enum Compression { ZSTD, } -impl From for datafusion::parquet::basic::Compression { +impl From for parquet::basic::Compression { fn from(value: Compression) -> Self { match value { - Compression::UNCOMPRESSED => datafusion::parquet::basic::Compression::UNCOMPRESSED, - Compression::SNAPPY => datafusion::parquet::basic::Compression::SNAPPY, - Compression::GZIP => datafusion::parquet::basic::Compression::GZIP, - Compression::LZO => datafusion::parquet::basic::Compression::LZO, - Compression::BROTLI => datafusion::parquet::basic::Compression::BROTLI, - Compression::LZ4 => datafusion::parquet::basic::Compression::LZ4, - Compression::ZSTD => datafusion::parquet::basic::Compression::ZSTD, + Compression::UNCOMPRESSED => parquet::basic::Compression::UNCOMPRESSED, + Compression::SNAPPY => parquet::basic::Compression::SNAPPY, + Compression::GZIP => parquet::basic::Compression::GZIP, + Compression::LZO => parquet::basic::Compression::LZO, + Compression::BROTLI => parquet::basic::Compression::BROTLI, + Compression::LZ4 => parquet::basic::Compression::LZ4, + Compression::ZSTD => parquet::basic::Compression::ZSTD, } } } diff --git a/server/src/query.rs b/server/src/query.rs index 922f05b87..897b2699b 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -46,7 +46,7 @@ fn get_value(value: &Value, key: Key) -> Result<&str, Key> { pub struct Query { pub query: String, pub stream_name: String, - pub merged_schema: Arc, + pub schema: Arc, pub start: DateTime, pub end: DateTime, } @@ -76,7 +76,7 @@ impl Query { } pub fn get_schema(&self) -> &Schema { - &self.merged_schema + &self.schema } /// Execute query on object storage(and if necessary on cache as well) with given stream information diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index a3d76e75d..7dcb19c5e 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -215,7 +215,7 @@ fn load_arrows( for file in files { let Ok(arrow_file) = File::open(file) else { return false; }; let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return false; }; - stream_readers.push(reader); + stream_readers.push(reader.into()); } let reader = crate::storage::MergedRecordReader { diff --git a/server/src/stats.rs b/server/src/stats.rs index 8d3e1cd45..945f83831 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -18,8 +18,6 @@ use std::sync::atomic::{AtomicU64, Ordering}; -use serde::{Deserialize, Serialize}; - #[derive(Debug)] pub struct StatsCounter { pub events_ingested: AtomicU64, @@ -73,13 +71,13 @@ impl StatsCounter { self.storage_size.fetch_add(size, Ordering::AcqRel); } - pub fn increase_event_by_one(&self) { - self.events_ingested.fetch_add(1, Ordering::Relaxed); + pub fn increase_event_by_n(&self, n: u64) { + self.events_ingested.fetch_add(n, Ordering::Relaxed); } } /// Helper struct type created by copying stats values from metadata -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] pub struct Stats { pub events: u64, pub ingestion: u64, diff --git a/server/src/storage.rs b/server/src/storage.rs index 6eaf3af11..7c2c28963 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -28,7 +28,6 @@ use datafusion::arrow::error::ArrowError; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::parquet::errors::ParquetError; use lazy_static::lazy_static; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::create_dir_all; @@ -68,7 +67,7 @@ const MAX_OBJECT_STORE_REQUESTS: usize = 1000; // const PERMISSIONS_READ_WRITE: &str = "readwrite"; const ACCESS_ALL: &str = "all"; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct ObjectStoreFormat { /// Version of schema registry pub version: String, @@ -82,7 +81,7 @@ pub struct ObjectStoreFormat { pub stats: Stats, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Owner { pub id: String, pub group: String, @@ -94,7 +93,7 @@ impl Owner { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Permisssion { pub id: String, pub group: String, @@ -196,7 +195,7 @@ impl CACHED_FILES { } } -#[derive(Serialize)] +#[derive(serde::Serialize)] pub struct LogStream { pub name: String, } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 78f906ca2..4b74e9077 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -30,23 +30,19 @@ use crate::{ }; use actix_web_prometheus::PrometheusMetrics; +use arrow_array::{RecordBatch, TimestampMillisecondArray}; +use arrow_ipc::reader::StreamReader; +use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{ - arrow::datatypes::Schema, - parquet::{basic::Encoding, schema::types::ColumnPath}, -}; -use datafusion::{ - arrow::{ - array::TimestampMillisecondArray, ipc::reader::StreamReader, record_batch::RecordBatch, - }, - datasource::listing::ListingTable, - error::DataFusionError, - execution::runtime_env::RuntimeEnv, - parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, + datasource::listing::ListingTable, error::DataFusionError, execution::runtime_env::RuntimeEnv, }; use itertools::kmerge_by; -use lazy_static::__Deref; +use parquet::{ + arrow::ArrowWriter, basic::Encoding, file::properties::WriterProperties, + schema::types::ColumnPath, +}; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde_json::Value; @@ -92,16 +88,13 @@ pub trait ObjectStorage: Sync + 'static { schema: Arc, ) -> Result, DataFusionError>; - async fn put_schema_map( + async fn put_schema( &self, stream_name: &str, - schema_map: &str, + schema: &Schema, ) -> Result<(), ObjectStorageError> { - self.put_object( - &schema_path(stream_name), - Bytes::copy_from_slice(schema_map.as_bytes()), - ) - .await?; + self.put_object(&schema_path(stream_name), to_bytes(schema)) + .await?; Ok(()) } @@ -114,11 +107,8 @@ pub trait ObjectStorage: Sync + 'static { let format_json = to_bytes(&format); - self.put_object( - &schema_path(stream_name), - to_bytes(&HashMap::::new()), - ) - .await?; + self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty())) + .await?; self.put_object(&stream_json_path(stream_name), format_json) .await?; @@ -172,44 +162,9 @@ pub trait ObjectStorage: Sync + 'static { .await } - async fn get_schema_map( - &self, - stream_name: &str, - ) -> Result>, ObjectStorageError> { + async fn get_schema(&self, stream_name: &str) -> Result { let schema_map = self.get_object(&schema_path(stream_name)).await?; - if let Ok(schema_map) = serde_json::from_slice(&schema_map) { - Ok(schema_map) - } else { - // fix for schema metadata serialize - let mut schema_map: serde_json::Value = - serde_json::from_slice(&schema_map).expect("valid json"); - - for schema in schema_map - .as_object_mut() - .expect("schema map is json object") - .values_mut() - { - let map = schema.as_object_mut().unwrap(); - map.insert( - "metadata".to_string(), - Value::Object(serde_json::Map::new()), - ); - - for field in schema["fields"] - .as_array_mut() - .expect("fields is json array") - { - let map = field.as_object_mut().unwrap(); - map.insert( - "metadata".to_string(), - Value::Object(serde_json::Map::new()), - ); - } - } - - Ok(serde_json::from_value(schema_map) - .expect("should be deserializable after alteration")) - } + Ok(serde_json::from_slice(&schema_map)?) } async fn get_alerts(&self, stream_name: &str) -> Result { @@ -329,12 +284,13 @@ pub trait ObjectStorage: Sync + 'static { } let record_reader = MergedRecordReader::try_new(&files).unwrap(); - - let mut parquet_table = CACHED_FILES.lock().unwrap(); - let parquet_file = - fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - parquet_table.upsert(&parquet_path); - + let parquet_file = { + let mut parquet_table = CACHED_FILES.lock().unwrap(); + let parquet_file = + fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + parquet_table.upsert(&parquet_path); + parquet_file + }; let props = WriterProperties::builder() .set_max_row_group_size(CONFIG.parseable.row_group_size) .set_compression(CONFIG.parseable.parquet_compression.into()) @@ -343,7 +299,9 @@ pub trait ObjectStorage: Sync + 'static { Encoding::DELTA_BINARY_PACKED, ) .build(); - let schema = Arc::new(record_reader.merged_schema()); + let merged_schema = record_reader.merged_schema(); + commit_schema_to_storage(stream, merged_schema.clone()).await?; + let schema = Arc::new(merged_schema); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; for ref record in record_reader.merged_iter(&schema) { @@ -438,9 +396,40 @@ pub trait ObjectStorage: Sync + 'static { } } +async fn commit_schema_to_storage( + stream_name: &str, + schema: Schema, +) -> Result<(), ObjectStorageError> { + let storage = CONFIG.storage().get_object_store(); + let stream_schema = storage.get_schema(stream_name).await?; + let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); + storage.put_schema(stream_name, &new_schema).await +} + +#[derive(Debug)] +pub struct Reader { + reader: StreamReader, + timestamp_col_index: usize, +} + +impl From> for Reader { + fn from(reader: StreamReader) -> Self { + let timestamp_col_index = reader + .schema() + .all_fields() + .binary_search_by(|field| field.name().as_str().cmp("p_timestamp")) + .expect("schema should have this field"); + + Self { + reader, + timestamp_col_index, + } + } +} + #[derive(Debug)] pub struct MergedRecordReader { - pub readers: Vec>, + pub readers: Vec, } impl MergedRecordReader { @@ -449,38 +438,46 @@ impl MergedRecordReader { for file in files { let reader = StreamReader::try_new(File::open(file).unwrap(), None)?; - readers.push(reader); + readers.push(reader.into()); } Ok(Self { readers }) } pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { - let adapted_readers = self.readers.into_iter().map(move |reader| reader.flatten()); - - kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { - let a: &TimestampMillisecondArray = a - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - let b: &TimestampMillisecondArray = b - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - a.value(0) < b.value(0) - }) - .map(|batch| adapt_batch(schema, batch)) + let adapted_readers = self.readers.into_iter().map(move |reader| { + reader + .reader + .flatten() + .zip(std::iter::repeat(reader.timestamp_col_index)) + }); + + kmerge_by( + adapted_readers, + |(a, a_col): &(RecordBatch, usize), (b, b_col): &(RecordBatch, usize)| { + let a: &TimestampMillisecondArray = a + .column(*a_col) + .as_any() + .downcast_ref::() + .unwrap(); + + let b: &TimestampMillisecondArray = b + .column(*b_col) + .as_any() + .downcast_ref::() + .unwrap(); + + a.value(0) < b.value(0) + }, + ) + .map(|(batch, _)| adapt_batch(schema, batch)) } pub fn merged_schema(&self) -> Schema { Schema::try_merge( self.readers .iter() - .map(|stream| stream.schema().deref().clone()), + .map(|reader| reader.reader.schema().as_ref().clone()), ) .unwrap() } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d44a1a436..9e2942659 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -33,7 +33,9 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use datafusion::datasource::object_store::ObjectStoreRegistry; +use datafusion::datasource::object_store::{ + DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, +}; use datafusion::error::DataFusionError; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use futures::stream::FuturesUnordered; @@ -120,8 +122,9 @@ impl ObjectStorageProvider for S3Config { // limit objectstore to a concurrent request limit let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); - let object_store_registry = ObjectStoreRegistry::new(); - object_store_registry.register_store("s3", &self.bucket_name, Arc::new(s3)); + let object_store_registry = DefaultObjectStoreRegistry::new(); + let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); + object_store_registry.register_store(url.as_ref(), Arc::new(s3)); let config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 31262ea93..498861407 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -22,7 +22,6 @@ use std::{ }; use once_cell::sync::OnceCell; -use serde::{Deserialize, Serialize}; use std::io; use crate::{option::CONFIG, utils::uid}; @@ -31,7 +30,7 @@ use super::object_storage::PARSEABLE_METADATA_FILE_NAME; pub static STORAGE_METADATA: OnceCell = OnceCell::new(); -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct StorageMetadata { pub version: String, pub mode: String, @@ -43,7 +42,7 @@ pub struct StorageMetadata { pub stream: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct User { username: String, password: String, diff --git a/server/src/utils.rs b/server/src/utils.rs index 4b4fb91e8..fca14cc1d 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -21,6 +21,7 @@ pub mod header_parsing; pub mod json; pub mod uid; pub mod update; +pub mod arrow; use std::path::Path; diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs new file mode 100644 index 000000000..d27bbc1b0 --- /dev/null +++ b/server/src/utils/arrow.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use arrow_array::{Array, RecordBatch}; +use arrow_schema::Schema; +use itertools::Itertools; + +pub fn replace_columns( + schema: Arc, + batch: RecordBatch, + indexes: &[usize], + arrays: &[Arc], +) -> RecordBatch { + let mut batch_arrays = batch.columns().iter().map(Arc::clone).collect_vec(); + for (&index, arr) in indexes.iter().zip(arrays.iter()) { + batch_arrays[index] = Arc::clone(arr); + } + RecordBatch::try_new(schema, batch_arrays).unwrap() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Array, Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + + use super::replace_columns; + + #[test] + fn check_replace() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + let schema_ref = Arc::new(schema); + + let rb = RecordBatch::try_new( + schema_ref.clone(), + vec![ + Arc::new(Int32Array::from_value(0, 3)), + Arc::new(Int32Array::from_value(0, 3)), + Arc::new(Int32Array::from_value(0, 3)), + ], + ) + .unwrap(); + + let arr: Arc = Arc::new(Int32Array::from_value(0, 3)); + + let new_rb = replace_columns(schema_ref.clone(), rb, &[2], &[arr]); + + assert_eq!(new_rb.schema(), schema_ref); + assert_eq!(new_rb.num_columns(), 3); + assert_eq!(new_rb.num_rows(), 3) + } +} diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index 8a0af86a0..75e2b5161 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -21,25 +21,10 @@ use serde_json::Value; pub mod flatten; -pub fn flatten_json_body(body: Value) -> Result { +pub fn flatten_json_body(body: serde_json::Value) -> Result { flatten::flatten(body, "_") } -pub fn merge(value: &mut Value, fields: impl Iterator) { - if let Value::Object(m) = value { - for (k, v) in fields { - match m.get_mut(&k) { - Some(val) => { - *val = v; - } - None => { - m.insert(k, v); - } - } - } - } -} - pub fn convert_to_string(value: &Value) -> Value { match value { Value::Null => Value::String("null".to_owned()), diff --git a/server/src/utils/json/flatten.rs b/server/src/utils/json/flatten.rs index da2a96dcc..77fba0b5b 100644 --- a/server/src/utils/json/flatten.rs +++ b/server/src/utils/json/flatten.rs @@ -16,30 +16,42 @@ * */ +use anyhow::anyhow; use itertools::Itertools; use serde_json::map::Map; use serde_json::value::Value; -pub fn flatten(nested_value: Value, separator: &str) -> Result { - let mut map = Map::new(); - if let Value::Object(nested_dict) = nested_value { - flatten_object(&mut map, None, nested_dict, separator)?; - } else { - return Err(()); +pub fn flatten(nested_value: Value, separator: &str) -> Result { + match nested_value { + Value::Object(nested_dict) => { + let mut map = Map::new(); + flatten_object(&mut map, None, nested_dict, separator)?; + Ok(Value::Object(map)) + } + Value::Array(mut arr) => { + for _value in &mut arr { + let value = std::mem::replace(_value, Value::Null); + let mut map = Map::new(); + let Value::Object(obj) = value else { return Err(anyhow!("Expected object in array of objects")) }; + flatten_object(&mut map, None, obj, separator)?; + *_value = Value::Object(map); + } + Ok(Value::Array(arr)) + } + _ => Err(anyhow!("Cannot flatten this JSON")), } - Ok(Value::Object(map)) } pub fn flatten_with_parent_prefix( nested_value: Value, prefix: &str, separator: &str, -) -> Result { +) -> Result { let mut map = Map::new(); if let Value::Object(nested_dict) = nested_value { flatten_object(&mut map, Some(prefix), nested_dict, separator)?; } else { - return Err(()); + return Err(anyhow!("Must be an object")); } Ok(Value::Object(map)) } @@ -49,7 +61,7 @@ pub fn flatten_object( parent_key: Option<&str>, nested_dict: Map, separator: &str, -) -> Result<(), ()> { +) -> Result<(), anyhow::Error> { for (key, value) in nested_dict.into_iter() { let new_key = parent_key.map_or_else( || key.clone(), @@ -78,7 +90,7 @@ pub fn flatten_array_objects( parent_key: &str, arr: Vec, separator: &str, -) -> Result<(), ()> { +) -> Result<(), anyhow::Error> { let mut columns: Vec<(String, Vec)> = Vec::new(); let mut len = 0; for value in arr { @@ -119,7 +131,9 @@ pub fn flatten_array_objects( column.push(Value::Null) } } else { - return Err(()); + return Err(anyhow!( + "Found non object element while flattening array of object(s)", + )); } len += 1; } diff --git a/server/src/validator.rs b/server/src/validator.rs index 5ba92e6db..2b31fa53f 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -16,8 +16,6 @@ * */ -use std::sync::Arc; - use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; @@ -164,14 +162,14 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result Date: Wed, 5 Apr 2023 19:39:17 +0530 Subject: [PATCH 02/14] Add test --- server/src/event.rs | 6 +- server/src/event/format.rs | 6 +- server/src/event/format/json.rs | 23 ++-- server/src/handlers/http/ingest.rs | 164 ++++++++++++++++++++++++++++- 4 files changed, 176 insertions(+), 23 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 6617bbda3..83e7d76f2 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -32,9 +32,9 @@ use crate::metadata; use self::error::EventError; pub use self::writer::STREAM_WRITERS; -const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; -const DEFAULT_TAGS_KEY: &str = "p_tags"; -const DEFAULT_METADATA_KEY: &str = "p_metadata"; +pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; +pub const DEFAULT_TAGS_KEY: &str = "p_tags"; +pub const DEFAULT_METADATA_KEY: &str = "p_metadata"; #[derive(Clone)] pub struct Event { diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 630ac455d..ba398977d 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -18,10 +18,10 @@ type Metadata = String; pub trait EventFormat: Sized { type Data; - fn to_data(self) -> Result<(Self::Data, Schema, Tags, Metadata), AnyError>; + fn to_data(self, schema: &Schema) -> Result<(Self::Data, Schema, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; - fn into_recordbatch(self) -> Result { - let (data, mut schema, tags, metadata) = self.to_data()?; + fn into_recordbatch(self, schema: &Schema) -> Result { + let (data, mut schema, tags, metadata) = self.to_data(schema)?; match tags_index(&schema) { Ok(_) => return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)), diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 96f9c20f2..05a7223ee 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -9,10 +9,9 @@ use serde_json::Value; use std::sync::Arc; use super::EventFormat; -use crate::{metadata::STREAM_INFO, utils::json::flatten_json_body}; +use crate::utils::json::flatten_json_body; pub struct Event { - pub stream_name: String, pub data: Value, pub tags: String, pub metadata: String, @@ -21,10 +20,13 @@ pub struct Event { impl EventFormat for Event { type Data = Vec; - fn to_data(self) -> Result<(Self::Data, Schema, String, String), anyhow::Error> { + fn to_data( + self, + schema: &Schema, + ) -> Result<(Self::Data, Schema, String, String), anyhow::Error> { let data = flatten_json_body(self.data)?; - let stream_schema = get_stream_schema(&self.stream_name); + let stream_schema = schema; let value_arr = match data { Value::Array(arr) => arr, @@ -35,7 +37,7 @@ impl EventFormat for Event { let fields = collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); - let schema = match derive_sub_schema(stream_schema, fields) { + let schema = match derive_sub_schema(stream_schema.clone(), fields) { Ok(schema) => schema, Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { Ok(mut infer_schema) => { @@ -43,10 +45,9 @@ impl EventFormat for Event { .fields .sort_by(|field1, field2| Ord::cmp(field1.name(), field2.name())); - if let Err(err) = Schema::try_merge(vec![ - get_stream_schema(&self.stream_name), - infer_schema.clone(), - ]) { + if let Err(err) = + Schema::try_merge(vec![stream_schema.clone(), infer_schema.clone()]) + { return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err)); } infer_schema @@ -79,10 +80,6 @@ impl EventFormat for Event { } } -fn get_stream_schema(stream_name: &str) -> Schema { - STREAM_INFO.schema(stream_name).unwrap().as_ref().clone() -} - // invariants for this to work. // All fields in existing schema and fields in event are sorted my name lexographically fn derive_sub_schema(schema: arrow_schema::Schema, fields: Vec<&str>) -> Result { diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 81fc6150f..6eefc2301 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,8 +16,11 @@ * */ +use std::sync::Arc; + use actix_web::http::header::ContentType; use actix_web::{HttpRequest, HttpResponse}; +use arrow_schema::Schema; use bytes::Bytes; use http::StatusCode; use serde_json::Value; @@ -26,6 +29,7 @@ use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY}; +use crate::metadata::STREAM_INFO; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; // Handler for POST /api/v1/ingest @@ -58,7 +62,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let (size, rb) = into_event_batch(req, body, stream_name.clone())?; + let (size, rb) = into_event_batch(req, body, &get_stream_schema(&stream_name))?; event::Event { rb, @@ -76,22 +80,25 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result fn into_event_batch( req: HttpRequest, body: Bytes, - stream_name: String, + schema: &Schema, ) -> Result<(usize, arrow_array::RecordBatch), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; let size = body.len(); let body: Value = serde_json::from_slice(&body)?; let event = format::json::Event { - stream_name, data: body, tags, metadata, }; - let rb = event.into_recordbatch()?; + let rb = event.into_recordbatch(schema)?; Ok((size, rb)) } +fn get_stream_schema(stream_name: &str) -> Arc { + STREAM_INFO.schema(stream_name).unwrap() +} + #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("Could not deserialize into JSON object, {0}")] @@ -123,3 +130,152 @@ impl actix_web::ResponseError for PostError { .body(self.to_string()) } } + +#[cfg(test)] +mod tests { + + use actix_web::test::TestRequest; + use arrow_array::{cast::as_string_array, Float64Array, Int64Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use bytes::Bytes; + use datafusion::common::cast::{as_float64_array, as_int64_array}; + use serde_json::json; + + use crate::{ + event, + handlers::{PREFIX_META, PREFIX_TAGS}, + }; + + use super::into_event_batch; + + #[test] + fn test_basic_object_into_rb() { + let json = json!({ + "a": 1, + "b": "hello", + "c": 4.23 + }); + + let req = TestRequest::default() + .append_header((PREFIX_TAGS.to_string() + "A", "tag1")) + .append_header((PREFIX_TAGS.to_string() + "B", "tag2")) + .append_header((PREFIX_META.to_string() + "C", "meta1")) + .to_http_request(); + + let (size, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &Schema::empty(), + ) + .unwrap(); + + assert_eq!(size, 28); + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 6); + assert_eq!( + as_int64_array(rb.column(0)).unwrap(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + as_string_array(rb.column(1)), + &StringArray::from_iter_values(["hello"]) + ); + assert_eq!( + as_float64_array(rb.column(2)).unwrap(), + &Float64Array::from_iter([4.23]) + ); + assert_eq!( + as_string_array(rb.column_by_name(event::DEFAULT_TAGS_KEY).unwrap()), + &StringArray::from_iter_values(["a=tag1^b=tag2"]) + ); + assert_eq!( + as_string_array(rb.column_by_name(event::DEFAULT_METADATA_KEY).unwrap()), + &StringArray::from_iter_values(["c=meta1"]) + ); + } + + #[test] + fn test_basic_object_with_null_into_rb() { + let json = json!({ + "a": 1, + "b": "hello", + "c": null + }); + + let req = TestRequest::default().to_http_request(); + + let (_, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &Schema::empty(), + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 5); + assert_eq!( + as_int64_array(rb.column(0)).unwrap(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + as_string_array(rb.column(1)), + &StringArray::from_iter_values(["hello"]) + ); + } + + #[test] + fn test_basic_object_derive_schema_into_rb() { + let json = json!({ + "a": 1, + "b": "hello", + }); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ]); + + let req = TestRequest::default().to_http_request(); + + let (_, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &schema, + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 5); + assert_eq!( + as_int64_array(rb.column(0)).unwrap(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + as_string_array(rb.column(1)), + &StringArray::from_iter_values(["hello"]) + ); + } + + #[test] + fn test_basic_object_schema_mismatch() { + let json = json!({ + "a": 1, + "b": 1, // type mismatch + }); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ]); + + let req = TestRequest::default().to_http_request(); + + dbg!(into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &schema, + )); + } +} From f4afbba5ca98f69bade86474923de02bb9dabdab Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 5 Apr 2023 21:23:02 +0530 Subject: [PATCH 03/14] Add schema check --- server/src/event/format/json.rs | 67 ++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 05a7223ee..59534b26a 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -3,7 +3,7 @@ use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions}; -use arrow_schema::{Field, Schema}; +use arrow_schema::{DataType, Field, Schema}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use serde_json::Value; use std::sync::Arc; @@ -61,6 +61,15 @@ impl EventFormat for Event { }, }; + if value_arr + .iter() + .any(|value| fields_mismatch(&schema, value)) + { + return Err(anyhow!( + "Could not process this event due to mismatch in datatype" + )); + } + Ok((value_arr, schema, self.tags, self.metadata)) } @@ -133,3 +142,59 @@ fn collect_keys<'a>(values: impl Iterator) -> Result bool { + for (name, val) in body.as_object().expect("body is of object variant") { + if val.is_null() { + continue; + } + + let Ok(field) = schema.field_with_name(name) else { return true }; + if !valid_type(field.data_type(), val) { + return true; + } + } + false +} + +fn valid_type(data_type: &DataType, value: &Value) -> bool { + match data_type { + DataType::Boolean => value.is_boolean(), + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(), + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(), + DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(), + DataType::Utf8 => value.is_string(), + DataType::List(field) => { + let data_type = field.data_type(); + if let Value::Array(arr) = value { + for elem in arr { + if !valid_type(data_type, elem) { + return false; + } + } + } + true + } + DataType::Struct(fields) => { + if let Value::Object(val) = value { + for (key, value) in val { + let field = (0..fields.len()) + .find(|idx| fields[*idx].name() == key) + .map(|idx| &fields[idx]); + + if let Some(field) = field { + if !valid_type(field.data_type(), value) { + return false; + } + } else { + return false; + } + } + true + } else { + false + } + } + _ => unreachable!(), + } +} From 1d3b583ab27d77c274306129b64cfabdd611e625 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 5 Apr 2023 21:36:18 +0530 Subject: [PATCH 04/14] Add test --- server/src/handlers/http/ingest.rs | 341 ++++++++++++++++++++++++++--- 1 file changed, 315 insertions(+), 26 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 6eefc2301..cb480f5a8 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -135,10 +135,11 @@ impl actix_web::ResponseError for PostError { mod tests { use actix_web::test::TestRequest; - use arrow_array::{cast::as_string_array, Float64Array, Int64Array, StringArray}; + use arrow_array::{ + types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray, + }; use arrow_schema::{DataType, Field, Schema}; use bytes::Bytes; - use datafusion::common::cast::{as_float64_array, as_int64_array}; use serde_json::json; use crate::{ @@ -148,8 +149,28 @@ mod tests { use super::into_event_batch; + trait TestExt { + fn as_int64_arr(&self) -> &Int64Array; + fn as_float64_arr(&self) -> &Float64Array; + fn as_utf8_arr(&self) -> &StringArray; + } + + impl TestExt for ArrayRef { + fn as_int64_arr(&self) -> &Int64Array { + self.as_any().downcast_ref().unwrap() + } + + fn as_float64_arr(&self) -> &Float64Array { + self.as_any().downcast_ref().unwrap() + } + + fn as_utf8_arr(&self) -> &StringArray { + self.as_any().downcast_ref().unwrap() + } + } + #[test] - fn test_basic_object_into_rb() { + fn basic_object_into_rb() { let json = json!({ "a": 1, "b": "hello", @@ -172,30 +193,31 @@ mod tests { assert_eq!(size, 28); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); + assert_eq!(rb.column(0).as_int64_arr(), &Int64Array::from_iter([1])); assert_eq!( - as_int64_array(rb.column(0)).unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - as_string_array(rb.column(1)), + rb.column(1).as_utf8_arr(), &StringArray::from_iter_values(["hello"]) ); assert_eq!( - as_float64_array(rb.column(2)).unwrap(), + rb.column(2).as_float64_arr(), &Float64Array::from_iter([4.23]) ); assert_eq!( - as_string_array(rb.column_by_name(event::DEFAULT_TAGS_KEY).unwrap()), + rb.column_by_name(event::DEFAULT_TAGS_KEY) + .unwrap() + .as_utf8_arr(), &StringArray::from_iter_values(["a=tag1^b=tag2"]) ); assert_eq!( - as_string_array(rb.column_by_name(event::DEFAULT_METADATA_KEY).unwrap()), + rb.column_by_name(event::DEFAULT_METADATA_KEY) + .unwrap() + .as_utf8_arr(), &StringArray::from_iter_values(["c=meta1"]) ); } #[test] - fn test_basic_object_with_null_into_rb() { + fn basic_object_with_null_into_rb() { let json = json!({ "a": 1, "b": "hello", @@ -213,18 +235,15 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); + assert_eq!(rb.column(0).as_int64_arr(), &Int64Array::from_iter([1])); assert_eq!( - as_int64_array(rb.column(0)).unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - as_string_array(rb.column(1)), + rb.column(1).as_utf8_arr(), &StringArray::from_iter_values(["hello"]) ); } #[test] - fn test_basic_object_derive_schema_into_rb() { + fn basic_object_derive_schema_into_rb() { let json = json!({ "a": 1, "b": "hello", @@ -247,18 +266,15 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); + assert_eq!(rb.column(0).as_int64_arr(), &Int64Array::from_iter([1])); assert_eq!( - as_int64_array(rb.column(0)).unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - as_string_array(rb.column(1)), + rb.column(1).as_utf8_arr(), &StringArray::from_iter_values(["hello"]) ); } #[test] - fn test_basic_object_schema_mismatch() { + fn basic_object_schema_mismatch() { let json = json!({ "a": 1, "b": 1, // type mismatch @@ -272,10 +288,283 @@ mod tests { let req = TestRequest::default().to_http_request(); - dbg!(into_event_batch( + assert!(into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), &schema, - )); + ) + .is_err()); + } + + #[test] + fn empty_object() { + let json = json!({}); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Float64, true), + Field::new("c", DataType::Float64, true), + ]); + + let req = TestRequest::default().to_http_request(); + + let (_, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &schema, + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 3); + } + + #[test] + fn non_object_arr_is_err() { + let json = json!([1]); + + let req = TestRequest::default().to_http_request(); + + assert!(into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &Schema::empty(), + ) + .is_err()) + } + + #[test] + fn arr_with_null_into_rb() { + let json = json!([ + { + "c": null, + "b": "hello", + "a": null + }, + { + "a": 1, + "c": 1.22, + "b": "hello" + }, + { + "b": "hello", + "a": 1, + "c": null + }, + ]); + + let req = TestRequest::default().to_http_request(); + + let (_, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &Schema::empty(), + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_columns(), 6); + assert_eq!( + rb.column(0).as_int64_arr(), + &Int64Array::from(vec![None, Some(1), Some(1)]) + ); + assert_eq!( + rb.column(1).as_utf8_arr(), + &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) + ); + assert_eq!( + rb.column(2).as_float64_arr(), + &Float64Array::from(vec![None, Some(1.22), None,]) + ); + } + + #[test] + fn arr_with_null_derive_schema_into_rb() { + let json = json!([ + { + "c": null, + "b": "hello", + "a": null + }, + { + "a": 1, + "c": 1.22, + "b": "hello" + }, + { + "b": "hello", + "a": 1, + "c": null + }, + ]); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ]); + + let req = TestRequest::default().to_http_request(); + + let (_, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &schema, + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_columns(), 6); + assert_eq!( + rb.column(0).as_int64_arr(), + &Int64Array::from(vec![None, Some(1), Some(1)]) + ); + assert_eq!( + rb.column(1).as_utf8_arr(), + &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) + ); + assert_eq!( + rb.column(2).as_float64_arr(), + &Float64Array::from(vec![None, Some(1.22), None,]) + ); + } + + #[test] + fn arr_obj_ignore_all_null_field() { + let json = json!([ + { + "a": 1, + "b": "hello", + "c": null + }, + { + "a": 1, + "b": "hello", + "c": null + }, + { + "a": 1, + "b": "hello", + "c": null + }, + ]); + + let req = TestRequest::default().to_http_request(); + + let (_, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &Schema::empty(), + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_columns(), 5); + assert_eq!( + rb.column(0).as_int64_arr(), + &Int64Array::from(vec![Some(1), Some(1), Some(1)]) + ); + assert_eq!( + rb.column(1).as_utf8_arr(), + &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) + ); + } + + #[test] + fn arr_schema_mismatch() { + let json = json!([ + { + "a": null, + "b": "hello", + "c": 1.24 + }, + { + "a": 1, + "b": "hello", + "c": 1 + }, + { + "a": 1, + "b": "hello", + "c": null + }, + ]); + + let req = TestRequest::default().to_http_request(); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ]); + + assert!(into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &schema, + ) + .is_err()); + } + + #[test] + fn arr_obj_with_nested_type() { + let json = json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1}] + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1, "b": 2}] + }, + ]); + + let req = TestRequest::default().to_http_request(); + + let (_, rb) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + &Schema::empty(), + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 4); + assert_eq!(rb.num_columns(), 7); + assert_eq!( + rb.column(0).as_int64_arr(), + &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) + ); + assert_eq!( + rb.column(1).as_utf8_arr(), + &StringArray::from(vec![ + Some("hello"), + Some("hello"), + Some("hello"), + Some("hello") + ]) + ); + + let c_a = vec![None, None, Some(vec![Some(1i64)]), Some(vec![Some(1)])]; + let c_b = vec![None, None, None, Some(vec![Some(2i64)])]; + + assert_eq!( + rb.column(2).as_any().downcast_ref::().unwrap(), + &ListArray::from_iter_primitive::(c_a) + ); + + assert_eq!( + rb.column(3).as_any().downcast_ref::().unwrap(), + &ListArray::from_iter_primitive::(c_b) + ); } } From 6856f1098310ee2d3fb3e9005ccb73c4a94edf3f Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 5 Apr 2023 21:39:20 +0530 Subject: [PATCH 05/14] Clippy Fix --- server/src/event/format.rs | 2 -- server/src/metadata.rs | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/event/format.rs b/server/src/event/format.rs index ba398977d..f398814fe 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use std::sync::Arc; use anyhow::{anyhow, Error as AnyError}; diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 13f7b4a33..0d3387f39 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -63,7 +63,6 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding // 3. When a stream is deleted (remove the entry from the map) // 4. When first event is sent to stream (update the schema) // 5. When set alert API is called (update the alert) -#[allow(clippy::all)] impl STREAM_INFO { pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> { let map = self.read().expect(LOCK_EXPECT); @@ -171,10 +170,10 @@ impl STREAM_INFO { stream.stats.add_ingestion_size(size); stream.stats.increase_event_by_n(num_rows); EVENTS_INGESTED - .with_label_values(&[stream_name.clone(), origin]) + .with_label_values(&[stream_name, origin]) .inc(); EVENTS_INGESTED_SIZE - .with_label_values(&[stream_name.clone(), origin]) + .with_label_values(&[stream_name, origin]) .add(size as i64); Ok(()) From c8395633c8375b7efd7973554abe650ddac4b4dc Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 5 Apr 2023 21:40:22 +0530 Subject: [PATCH 06/14] Add header --- server/src/event/format.rs | 19 +++++++++++++++++++ server/src/event/format/json.rs | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/server/src/event/format.rs b/server/src/event/format.rs index f398814fe..b0554d4f9 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + use std::sync::Arc; use anyhow::{anyhow, Error as AnyError}; diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 59534b26a..523ffeaca 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + #![allow(deprecated)] use anyhow::anyhow; From 3f8eacd4d954869901fbf85c332c331e52affcc9 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 5 Apr 2023 21:41:13 +0530 Subject: [PATCH 07/14] Add header --- server/src/utils/arrow.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index d27bbc1b0..d9db95b1e 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + use std::sync::Arc; use arrow_array::{Array, RecordBatch}; From 2f1a0a83b005cb43d05216384273b8a18cc8df3b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 5 Apr 2023 21:42:38 +0530 Subject: [PATCH 08/14] Format --- server/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/utils.rs b/server/src/utils.rs index fca14cc1d..ed5213b79 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -16,12 +16,12 @@ * */ +pub mod arrow; pub mod batch_adapter; pub mod header_parsing; pub mod json; pub mod uid; pub mod update; -pub mod arrow; use std::path::Path; From 50a26844cc9cf785698019330561e3acff069917 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 5 Apr 2023 21:56:36 +0530 Subject: [PATCH 09/14] Remove a tag --- server/src/handlers/http/ingest.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index cb480f5a8..6b9da01e1 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -179,7 +179,6 @@ mod tests { let req = TestRequest::default() .append_header((PREFIX_TAGS.to_string() + "A", "tag1")) - .append_header((PREFIX_TAGS.to_string() + "B", "tag2")) .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); @@ -206,7 +205,7 @@ mod tests { rb.column_by_name(event::DEFAULT_TAGS_KEY) .unwrap() .as_utf8_arr(), - &StringArray::from_iter_values(["a=tag1^b=tag2"]) + &StringArray::from_iter_values(["a=tag1"]) ); assert_eq!( rb.column_by_name(event::DEFAULT_METADATA_KEY) From 2bed3c40910331390289693969337497e967ea7a Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 6 Apr 2023 11:30:43 +0530 Subject: [PATCH 10/14] Remove Default --- server/src/option.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 717a3ea7e..ee440b507 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -142,12 +142,6 @@ impl Config { } } -impl Default for Config { - fn default() -> Self { - Self::new() - } -} - fn parseable_cli_command() -> Command { let local = Server::get_clap_command("local-store"); let local = ::augment_args_for_update(local); From 90d57b7d67dcdf3e36519b8c2732fe6712141ddd Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 6 Apr 2023 12:37:14 +0530 Subject: [PATCH 11/14] Add migration --- server/src/migration.rs | 37 ++++++--- server/src/migration/schema_migration.rs | 76 +++++++++++-------- .../migration/stream_metadata_migration.rs | 33 +++++++- server/src/storage.rs | 7 +- 4 files changed, 107 insertions(+), 46 deletions(-) diff --git a/server/src/migration.rs b/server/src/migration.rs index 67f38670a..5bde8c889 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -43,22 +43,37 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: let stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); - let maybe_v1 = stream_metadata + let version = stream_metadata .as_object() .and_then(|meta| meta.get("version")) .and_then(|version| version.as_str()); - if matches!(maybe_v1, Some("v1")) { - let new_stream_metadata = stream_metadata_migration::v1_v2(stream_metadata); - storage - .put_object(&path, to_bytes(&new_stream_metadata)) - .await?; + match version { + Some("v1") => { + let new_stream_metadata = stream_metadata_migration::v1_v3(stream_metadata); + storage + .put_object(&path, to_bytes(&new_stream_metadata)) + .await?; - let schema_path = RelativePathBuf::from_iter([stream, ".schema"]); - let schema = storage.get_object(&schema_path).await?; - let schema = serde_json::from_slice(&schema).ok(); - let map = schema_migration::v1_v2(schema)?; - storage.put_object(&schema_path, to_bytes(&map)).await?; + let schema_path = RelativePathBuf::from_iter([stream, ".schema"]); + let schema = storage.get_object(&schema_path).await?; + let schema = serde_json::from_slice(&schema).ok(); + let map = schema_migration::v1_v3(schema)?; + storage.put_object(&schema_path, to_bytes(&map)).await?; + } + Some("v2") => { + let new_stream_metadata = stream_metadata_migration::v2_v3(stream_metadata); + storage + .put_object(&path, to_bytes(&new_stream_metadata)) + .await?; + + let schema_path = RelativePathBuf::from_iter([stream, ".schema"]); + let schema = storage.get_object(&schema_path).await?; + let schema = serde_json::from_slice(&schema)?; + let map = schema_migration::v2_v3(schema)?; + storage.put_object(&schema_path, to_bytes(&map)).await?; + } + _ => (), } Ok(()) diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index 18a301ec3..9c9651fbb 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -19,34 +19,50 @@ use std::collections::HashMap; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use itertools::Itertools; - -pub(super) fn v1_v2(schema: Option) -> anyhow::Result> { - let Some(schema) = schema else { return Ok(HashMap::new()) }; - let schema = Schema::try_merge(vec![ - Schema::new(vec![Field::new( - "p_timestamp", - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )]), - schema, - ])?; - - let list_of_fields = schema - .fields() - .iter() - // skip p_timestamp - .skip(1) - .map(|f| f.name()) - .sorted(); - - let mut hasher = xxhash_rust::xxh3::Xxh3::new(); - list_of_fields.for_each(|field| hasher.update(field.as_bytes())); - let hash = hasher.digest(); - let key = format!("{hash:x}"); - - let mut map = HashMap::new(); - map.insert(key, schema); - Ok(map) +use arrow_schema::{DataType, Field, Schema}; +use serde_json::Value; + +pub(super) fn v1_v3(schema: Option) -> anyhow::Result { + if let Some(schema) = schema { + value_to_schema(schema) + } else { + Ok(Schema::empty()) + } +} + +pub(super) fn v2_v3(schemas: HashMap) -> anyhow::Result { + let mut derived_schemas = Vec::new(); + + for value in schemas.into_values() { + let schema = value_to_schema(value)?; + derived_schemas.push(schema); + } + + let mut schema = Schema::try_merge(derived_schemas)?; + schema.fields.sort_by(|a, b| a.name().cmp(b.name())); + Ok(schema) +} + +fn value_to_schema(schema: Value) -> Result { + let fields = schema + .as_object() + .expect("schema is an object") + .get("fields") + .expect("fields exists") + .as_array() + .expect("fields is an array"); + + let mut new_fields = Vec::new(); + + for field in fields { + let field = field.as_object().unwrap(); + let field_name: String = + serde_json::from_value(field.get("name").unwrap().clone()).unwrap(); + let field_dt: DataType = + serde_json::from_value(field.get("datatype").unwrap().clone()).unwrap(); + new_fields.push(Field::new(field_name, field_dt, true)); + } + new_fields.sort(); + + Ok(Schema::new(new_fields)) } diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index 386056482..4da6c96f0 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -19,7 +19,28 @@ use serde_json::{json, Value}; -pub fn v1_v2(mut stream_metadata: Value) -> Value { +use crate::storage; + +pub fn v1_v3(mut stream_metadata: Value) -> Value { + let default_stats = json!({ + "events": 0, + "ingestion": 0, + "storage": 0 + }); + let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); + stream_metadata_map.entry("stats").or_insert(default_stats); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); + stream_metadata +} + +pub fn v2_v3(mut stream_metadata: Value) -> Value { let default_stats = json!({ "events": 0, "ingestion": 0, @@ -27,7 +48,13 @@ pub fn v1_v2(mut stream_metadata: Value) -> Value { }); let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); stream_metadata_map.entry("stats").or_insert(default_stats); - stream_metadata_map.insert("version".to_owned(), Value::String("v2".into())); - stream_metadata_map.insert("objectstore-format".to_owned(), Value::String("v2".into())); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); stream_metadata } diff --git a/server/src/storage.rs b/server/src/storage.rs index 7c2c28963..2ec2bf473 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -67,6 +67,9 @@ const MAX_OBJECT_STORE_REQUESTS: usize = 1000; // const PERMISSIONS_READ_WRITE: &str = "readwrite"; const ACCESS_ALL: &str = "all"; +pub const CURRENT_OBJECT_STORE_VERSION: &str = "v3"; +pub const CURRENT_SCHEMA_VERSION: &str = "v3"; + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct ObjectStoreFormat { /// Version of schema registry @@ -113,8 +116,8 @@ impl Permisssion { impl Default for ObjectStoreFormat { fn default() -> Self { Self { - version: "v2".to_string(), - objectstore_format: "v1".to_string(), + version: CURRENT_SCHEMA_VERSION.to_string(), + objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), created_at: Local::now().to_rfc3339(), owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], From 89e158ab17181b990503fa6cbdc905209c1aaa15 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 6 Apr 2023 13:10:09 +0530 Subject: [PATCH 12/14] FIx --- server/src/migration/schema_migration.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index 9c9651fbb..e65dec044 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -59,10 +59,10 @@ fn value_to_schema(schema: Value) -> Result { let field_name: String = serde_json::from_value(field.get("name").unwrap().clone()).unwrap(); let field_dt: DataType = - serde_json::from_value(field.get("datatype").unwrap().clone()).unwrap(); + serde_json::from_value(field.get("data_type").unwrap().clone()).unwrap(); new_fields.push(Field::new(field_name, field_dt, true)); } - new_fields.sort(); + new_fields.sort_by(|a, b| a.name().cmp(b.name())); Ok(Schema::new(new_fields)) } From 83a8fd33702f7ff192fe17c66a3bdf7e99687fb7 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 6 Apr 2023 20:20:53 +0530 Subject: [PATCH 13/14] Schema fix --- server/src/handlers/http/logstream.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 9274b2403..cd42e3e62 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -72,8 +72,6 @@ pub async fn list(_: HttpRequest) -> impl Responder { pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let schema = STREAM_INFO.schema(&stream_name)?; - let schema = serde_json::to_value(&schema).unwrap(); - Ok((web::Json(schema), StatusCode::OK)) } From 8740d7f8a20a7aa86b96aefa5dd7483b542dd7c4 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 8 Apr 2023 22:59:52 +0530 Subject: [PATCH 14/14] Fix --- server/src/event.rs | 57 +++++++++++++++++++++++++++++- server/src/handlers/http/ingest.rs | 2 +- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 83e7d76f2..da52c1b7d 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -109,11 +109,12 @@ pub fn get_schema_key(fields: &Vec) -> String { pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), EventError> { let mut stream_metadata = metadata::STREAM_INFO.write().expect("lock poisoned"); - let schema = Schema::try_merge(vec![ + let mut schema = Schema::try_merge(vec![ schema.as_ref().clone(), stream_metadata.get_unchecked(stream_name).as_ref().clone(), ]) .unwrap(); + schema.fields.sort_by(|a, b| a.name().cmp(b.name())); stream_metadata.set_unchecked(stream_name, Arc::new(schema)); Ok(()) @@ -158,3 +159,57 @@ pub mod error { ObjectStorage(#[from] ObjectStorageError), } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, Schema}; + + use super::Event; + + fn test_rb(fields: Vec) -> RecordBatch { + RecordBatch::new_empty(Arc::new(Schema::new(fields))) + } + + fn test_event(fields: Vec) -> Event { + Event { + stream_name: "".to_string(), + rb: test_rb(fields), + origin_format: "none", + origin_size: 0, + } + } + + #[test] + fn new_field_is_new_event() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + ]); + + let new_event = test_event(vec![ + Field::new("a", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ]); + + assert!(new_event.is_first_event(&schema)); + } + + #[test] + fn same_field_not_is_new_event() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ]); + + let new_event = test_event(vec![ + Field::new("a", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ]); + + assert!(!new_event.is_first_event(&schema)); + } +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 6b9da01e1..d3c4c99b5 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -172,9 +172,9 @@ mod tests { #[test] fn basic_object_into_rb() { let json = json!({ + "c": 4.23, "a": 1, "b": "hello", - "c": 4.23 }); let req = TestRequest::default()