From 59512883ebb776ad9bb024f2b6a0d278b3a8845b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 22 Jun 2023 15:53:25 +0530 Subject: [PATCH 1/2] Move to datafusion 26 and arrow 40 --- Cargo.lock | 170 +++++++++++------------ server/Cargo.toml | 25 ++-- server/src/alerts/mod.rs | 2 +- server/src/event.rs | 8 +- server/src/event/format.rs | 32 ++--- server/src/event/format/json.rs | 44 +++--- server/src/handlers/http/ingest.rs | 135 +++++++++--------- server/src/metadata.rs | 15 +- server/src/migration/schema_migration.rs | 7 +- server/src/response.rs | 4 +- server/src/storage/localfs.rs | 2 +- server/src/storage/s3.rs | 2 +- server/src/utils/arrow.rs | 10 +- server/src/utils/arrow/merged_reader.rs | 4 +- 14 files changed, 241 insertions(+), 219 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb5cc2621..6110d9eae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,7 +71,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "zstd 0.12.3+zstd.1.5.2", + "zstd", ] [[package]] @@ -301,9 +301,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.20" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" dependencies = [ "memchr", ] @@ -323,6 +323,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -366,9 +372,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990dfa1a9328504aa135820da1c95066537b69ad94c04881b785f64328e0fa6b" +checksum = "6619cab21a0cdd8c9b9f1d9e09bfaa9b1974e5ef809a6566aef0b998caf38ace" dependencies = [ "ahash 0.8.3", "arrow-arith", @@ -388,9 +394,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2b2e52de0ab54173f9b08232b7184c26af82ee7ab4ac77c83396633c90199fa" +checksum = "e0dc95485623a76e00929bda8caa40c1f838190952365c4f43a7b9ae86d03e94" dependencies = [ "arrow-array", "arrow-buffer", @@ -403,9 +409,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10849b60c17dbabb334be1f4ef7550701aa58082b71335ce1ed586601b2f423" +checksum = "3267847f53d3042473cfd2c769afd8d74a6d7d201fc3a34f5cb84c0282ef47a7" dependencies = [ "ahash 0.8.3", "arrow-buffer", @@ -420,9 +426,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0746ae991b186be39933147117f8339eb1c4bbbea1c8ad37e7bf5851a1a06ba" +checksum = "c5f66553e66e120ac4b21570368ee9ebf35ff3f5399f872b0667699e145678f5" dependencies = [ "half", "num", @@ -430,9 +436,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88897802515d7b193e38b27ddd9d9e43923d410a9e46307582d756959ee9595" +checksum = "65e6f3579dbf0d97c683d451b2550062b0f0e62a3169bf74238b5f59f44ad6d8" dependencies = [ "arrow-array", "arrow-buffer", @@ -447,9 +453,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c8220d9741fc37961262710ceebd8451a5b393de57c464f0267ffdda1775c0a" +checksum = "373579c4c1a8f5307d3125b7a89c700fcf8caf85821c77eb4baab3855ae0aba5" dependencies = [ "arrow-array", "arrow-buffer", @@ -466,9 +472,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "533f937efa1aaad9dc86f6a0e382c2fa736a4943e2090c946138079bdf060cef" +checksum = "61bc8df9912cca6642665fdf989d6fa0de2570f18a7f709bcf59d29de96d2097" dependencies = [ "arrow-buffer", "arrow-schema", @@ -478,9 +484,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18b75296ff01833f602552dff26a423fc213db8e5049b540ca4a00b1c957e41c" +checksum = "0105dcf5f91daa7182d87b713ee0b32b3bfc88e0c48e7dc3e9d6f1277a07d1ae" dependencies = [ "arrow-array", "arrow-buffer", @@ -492,9 +498,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e501d3de4d612c90677594896ca6c0fa075665a7ff980dc4189bb531c17e19f6" +checksum = "e73134fb5b5ec8770f8cbb214c2c487b2d350081e403ca4eeeb6f8f5e19846ac" dependencies = [ "arrow-array", "arrow-buffer", @@ -506,14 +512,15 @@ dependencies = [ "indexmap", "lexical-core", "num", + "serde", "serde_json", ] [[package]] name = "arrow-ord" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d2671eb3793f9410230ac3efb0e6d36307be8a2dac5fad58ac9abde8e9f01e" +checksum = "89f25bc66e18d4c2aa1fe2f9bb03e2269da60e636213210385ae41a107f9965a" dependencies = [ "arrow-array", "arrow-buffer", @@ -526,9 +533,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc11fa039338cebbf4e29cf709c8ac1d6a65c7540063d4a25f991ab255ca85c8" +checksum = "1095ff85ea4f5ff02d17b30b089de31b51a50be01c6b674f0a0509ab771232f1" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -541,18 +548,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d04f17f7b86ded0b5baf98fe6123391c4343e031acc3ccc5fa604cc180bff220" +checksum = "25187bbef474151a2e4ddec67b9e34bda5cbfba292dc571392fa3a1f71ff5a82" dependencies = [ "serde", ] [[package]] name = "arrow-select" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "163e35de698098ff5f5f672ada9dc1f82533f10407c7a11e2cd09f3bcf31d18a" +checksum = "fd0d4ee884aec3aa05e41478e3cd312bf609de9babb5d187a43fb45931da4da4" dependencies = [ "arrow-array", "arrow-buffer", @@ -563,9 +570,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfdfbed1b10209f0dc68e6aa4c43dc76079af65880965c7c3b73f641f23d4aba" +checksum = "d6d71c3ffe4c07e66ce8fdc6aed5b00e0e60c5144911879b10546f5b72d8fa1c" dependencies = [ "arrow-array", "arrow-buffer", @@ -578,9 +585,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.3.15" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11" dependencies = [ "bzip2", "flate2", @@ -590,8 +597,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd 0.11.2+zstd.1.5.2", - "zstd-safe 5.0.2+zstd.1.5.2", + "zstd", + "zstd-safe", ] [[package]] @@ -1138,13 +1145,13 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.23" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" +checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ + "android-tzdata", "iana-time-zone", "js-sys", - "num-integer", "num-traits", "serde", "time 0.1.43", @@ -1541,12 +1548,14 @@ dependencies = [ [[package]] name = "datafusion" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bdb93fee4f30368f1f71bfd5cd28882ec9fab0183db7924827b76129d33227c" +checksum = "9992c267436551d40b52d65289b144712e7b0ebdc62c8c859fd1574e5f73efbb" dependencies = [ "ahash 0.8.3", "arrow", + "arrow-array", + "arrow-schema", "async-compression", "async-trait", "bytes", @@ -1584,14 +1593,14 @@ dependencies = [ "url", "uuid", "xz2", - "zstd 0.12.3+zstd.1.5.2", + "zstd", ] [[package]] name = "datafusion-common" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e82401ce129e601d406012b6d718f8978ba84c386e1c342fa155877120d68824" +checksum = "c3be97f7a7c720cdbb71e9eeabf814fa6ad8102b9022390f6cac74d3b4af6392" dependencies = [ "arrow", "arrow-array", @@ -1604,9 +1613,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b08b2078aed21a27239cd93f3015e492a58b0d50ebeeaf8d2236cf108ef583ce" +checksum = "c77c4b14b809b0e4c5bb101b6834504f06cdbb0d3c643400c61d0d844b33264e" dependencies = [ "dashmap", "datafusion-common", @@ -1622,21 +1631,24 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b5b977ce9695fb4c67614266ec57f384fc11e9a9f9b3e6d0e62b9c5a9f2c1f" +checksum = "e6ec7409bd45cf4fae6395d7d1024c8a97e543cadc88363e405d2aad5330e5e7" dependencies = [ "ahash 0.8.3", "arrow", "datafusion-common", + "lazy_static", "sqlparser", + "strum", + "strum_macros", ] [[package]] name = "datafusion-optimizer" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0b2bb9e73ed778d1bc5af63a270f0154bf6eab5099c77668a6362296888e46b" +checksum = "64b537c93f87989c212db92a448a0f5eb4f0995e27199bb7687ae94f8b64a7a8" dependencies = [ "arrow", "async-trait", @@ -1652,9 +1664,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80cd8ea5ab0a07b1b2a3e17d5909f1b1035bd129ffeeb5c66842a32e682f8f79" +checksum = "f60ee3f53340fdef36ee54d9e12d446ae2718b1d0196ac581f791d34808ec876" dependencies = [ "ahash 0.8.3", "arrow", @@ -1672,6 +1684,7 @@ dependencies = [ "indexmap", "itertools", "lazy_static", + "libc", "md-5", "paste", "petgraph", @@ -1684,9 +1697,9 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a95d6badab19fd6e9195fdc5209ac0a7e5ce9bcdedc67767b9ffc1b4e645760" +checksum = "d58fc64058aa3bcb00077a0d19474a0d584d31dec8c7ac3406868f485f659af9" dependencies = [ "arrow", "datafusion-common", @@ -1696,9 +1709,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "22.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37a78f8fc67123c4357e63bc0c87622a2a663d26f074958d749a633d0ecde90f" +checksum = "1531f0314151a34bf6c0a83c7261525688b7c729876f53e7896b8f4ca8f57d07" dependencies = [ "arrow", "arrow-schema", @@ -2428,9 +2441,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.139" +version = "0.2.146" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" +checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" [[package]] name = "libm" @@ -2912,9 +2925,9 @@ dependencies = [ [[package]] name = "parquet" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "321a15f8332645759f29875b07f8233d16ed8ec1b3582223de81625a9f8506b7" +checksum = "d6a656fcc17e641657c955742c689732684e096f790ff30865d9f8dcc39f7c4a" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -2934,13 +2947,14 @@ dependencies = [ "lz4", "num", "num-bigint", + "object_store", "paste", "seq-macro", "snap", "thrift", "tokio", "twox-hash", - "zstd 0.12.3+zstd.1.5.2", + "zstd", ] [[package]] @@ -3381,9 +3395,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.3" +version = "1.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d" +checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" dependencies = [ "aho-corasick", "memchr", @@ -3398,9 +3412,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" [[package]] name = "regex-syntax" -version = "0.6.29" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" +checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" [[package]] name = "relative-path" @@ -3841,9 +3855,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "sqlparser" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0366f270dbabb5cc2e4c88427dc4c08bba144f81e32fbd459a013f26a4d16aa0" +checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59" dependencies = [ "log", "sqlparser_derive", @@ -3894,6 +3908,9 @@ name = "strum" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" +dependencies = [ + "strum_macros", +] [[package]] name = "strum_macros" @@ -4784,32 +4801,13 @@ dependencies = [ "flate2", ] -[[package]] -name = "zstd" -version = "0.11.2+zstd.1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" -dependencies = [ - "zstd-safe 5.0.2+zstd.1.5.2", -] - [[package]] name = "zstd" version = "0.12.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" dependencies = [ - "zstd-safe 6.0.4+zstd.1.5.4", -] - -[[package]] -name = "zstd-safe" -version = "5.0.2+zstd.1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" -dependencies = [ - "libc", - "zstd-sys", + "zstd-safe", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index d91dce41a..1f793e51e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -15,11 +15,11 @@ actix-cors = "0.6" actix-web-prometheus = { version = "0.1" } prometheus = { version = "0.13", features = ["process"] } anyhow = { version = "1.0", features = ["backtrace"] } -arrow-schema = { version = "36.0.0", features = ["serde"] } -arrow-array = { version = "36.0.0" } -arrow-json = "36.0.0" -arrow-ipc = "36.0.0" -arrow-select = "36.0.0" +arrow-schema = { version = "40.0.0", features = ["serde"] } +arrow-array = { version = "40.0.0" } +arrow-json = "40.0.0" +arrow-ipc = "40.0.0" +arrow-select = "40.0.0" async-trait = "0.1" base64 = "0.21" bytes = "1.4" @@ -35,7 +35,7 @@ clap = { version = "4.1", default-features = false, features = [ "error-context", ] } crossterm = "0.26" -datafusion = "22.0.0" +datafusion = "26.0.0" object_store = { version = "0.5.6", features = ["aws", "aws_profile"] } derive_more = "0.99" env_logger = "0.10" @@ -49,7 +49,12 @@ sysinfo = "0.28.4" hostname = "0.3" rand = "0.8" relative-path = { version = "1.7", features = ["serde"] } -reqwest = { version = "0.11", default_features=false, features=["rustls", "json", "hyper-rustls", "tokio-rustls"]} +reqwest = { version = "0.11", default_features = false, features = [ + "rustls", + "json", + "hyper-rustls", + "tokio-rustls", +] } rustls = "0.20" rustls-pemfile = "1.0" semver = "1.0" @@ -70,10 +75,10 @@ ulid = { version = "1.0", features = ["serde"] } hex = "0.4" itertools = "0.10" xxhash-rust = { version = "0.8", features = ["xxh3"] } -xz2 = { version = "*", features=["static"] } -bzip2 = { version = "*", features=["static"] } +xz2 = { version = "*", features = ["static"] } +bzip2 = { version = "*", features = ["static"] } once_cell = "1.17.1" -parquet = "36.0.0" +parquet = "40.0.0" pyroscope = { version = "0.5.3", optional = true } pyroscope_pprofrs = { version = "0.2", optional = true } uptime_lib = "0.2.2" diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 8dc7d53b9..97458223f 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -136,7 +136,7 @@ impl Message { // checks if message (with a column name) is valid (i.e. the column name is present in the schema) pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool { if let Some(col) = column { - return get_field(schema, col).is_some(); + return get_field(&schema.fields, col).is_some(); } true } diff --git a/server/src/event.rs b/server/src/event.rs index 152134d0e..b3877126b 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -21,7 +21,7 @@ pub mod format; mod writer; use arrow_array::RecordBatch; -use arrow_schema::{Field, Schema}; +use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; use std::sync::Arc; @@ -85,7 +85,7 @@ impl Event { } } -pub fn get_schema_key(fields: &[Field]) -> String { +pub fn get_schema_key(fields: &[Arc]) -> String { // Fields must be sorted let mut hasher = xxhash_rust::xxh3::Xxh3::new(); for field in fields.iter().sorted_by_key(|v| v.name()) { @@ -102,10 +102,10 @@ pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), Event .get_mut(stream_name) .expect("map has entry for this stream name") .schema; - let current_schema = Schema::new(map.values().cloned().collect()); + let current_schema = Schema::new(map.values().cloned().collect::()); let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; map.clear(); - map.extend(schema.fields.into_iter().map(|f| (f.name().clone(), f))); + map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); Ok(()) } diff --git a/server/src/event/format.rs b/server/src/event/format.rs index e337f991e..1fe4d0591 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -39,12 +39,12 @@ pub trait EventFormat: Sized { type Data; fn to_data( self, - schema: &HashMap, - ) -> Result<(Self::Data, Schema, bool, Tags, Metadata), AnyError>; + schema: HashMap>, + ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; fn into_recordbatch( self, - schema: &HashMap, + schema: HashMap>, ) -> Result<(RecordBatch, bool), AnyError> { let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?; @@ -67,28 +67,28 @@ pub trait EventFormat: Sized { }; // add the p_timestamp field to the event schema to the 0th index - schema.fields.insert( + schema.insert( 0, - Field::new( + Arc::new(Field::new( DEFAULT_TIMESTAMP_KEY, DataType::Timestamp(TimeUnit::Millisecond, None), true, - ), + )), ); // p_tags and p_metadata are added to the end of the schema - let tags_index = schema.fields.len(); + let tags_index = schema.len(); let metadata_index = tags_index + 1; - schema - .fields - .push(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)); - schema - .fields - .push(Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true)); + schema.push(Arc::new(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true))); + schema.push(Arc::new(Field::new( + DEFAULT_METADATA_KEY, + DataType::Utf8, + true, + ))); // prepare the record batch and new fields to be added - let schema_ref = Arc::new(schema); - let rb = Self::decode(data, Arc::clone(&schema_ref))?; + let schema = Arc::new(Schema::new(schema)); + let rb = Self::decode(data, schema.clone())?; let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); let metadata_arr = StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows())); @@ -96,7 +96,7 @@ pub trait EventFormat: Sized { // modify the record batch to add fields to respective indexes let rb = utils::arrow::replace_columns( - Arc::clone(&schema_ref), + Arc::clone(&schema), rb, &[0, tags_index, metadata_index], &[ diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 26b45a0ad..9e71c87cb 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -21,19 +21,19 @@ use anyhow::anyhow; use arrow_array::RecordBatch; -use arrow_json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions}; -use arrow_schema::{DataType, Field, Schema}; +use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder}; +use arrow_schema::{DataType, Field, Fields, Schema}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use serde_json::Value; use std::{collections::HashMap, sync::Arc}; -use super::EventFormat; +use super::{EventFormat, Metadata, Tags}; use crate::utils::{arrow::get_field, json::flatten_json_body}; pub struct Event { pub data: Value, - pub tags: String, - pub metadata: String, + pub tags: Tags, + pub metadata: Metadata, } impl EventFormat for Event { @@ -43,10 +43,9 @@ impl EventFormat for Event { // also extract the arrow schema, tags and metadata from the incoming json fn to_data( self, - schema: &HashMap, - ) -> Result<(Self::Data, Schema, bool, String, String), anyhow::Error> { + schema: HashMap>, + ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { let data = flatten_json_body(self.data)?; - let stream_schema = schema; // incoming event may be a single json or a json array @@ -63,18 +62,18 @@ impl EventFormat for Event { collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); let mut is_first = false; - let schema = match derive_arrow_schema(stream_schema, fields) { + let schema = match derive_arrow_schema(&stream_schema, fields) { Ok(schema) => schema, Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { Ok(infer_schema) => { if let Err(err) = Schema::try_merge(vec![ - Schema::new(stream_schema.values().cloned().collect()), + Schema::new(stream_schema.values().cloned().collect::()), infer_schema.clone(), ]) { return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err)); } is_first = true; - infer_schema + infer_schema.fields.iter().cloned().collect() } Err(err) => { return Err(anyhow!( @@ -100,13 +99,13 @@ impl EventFormat for Event { // Convert the Data type (defined above) to arrow record batch fn decode(data: Self::Data, schema: Arc) -> Result { let array_capacity = round_upto_multiple_of_64(data.len()); - let value_iter: &mut (dyn Iterator) = &mut data.into_iter(); + let mut reader = ReaderBuilder::new(schema) + .with_batch_size(array_capacity) + .with_coerce_primitive(false) + .build_decoder()?; - let reader = Decoder::new( - schema, - DecoderOptions::new().with_batch_size(array_capacity), - ); - match reader.next_batch(&mut value_iter.map(Ok)) { + reader.serialize(&data)?; + match reader.flush() { Ok(Some(recordbatch)) => Ok(recordbatch), Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)), Ok(None) => unreachable!("all records are added to one rb"), @@ -116,14 +115,17 @@ impl EventFormat for Event { // Returns arrow schema with the fields that are present in the request body // This schema is an input to convert the request body to arrow record batch -fn derive_arrow_schema(schema: &HashMap, fields: Vec<&str>) -> Result { +fn derive_arrow_schema( + schema: &HashMap>, + fields: Vec<&str>, +) -> Result>, ()> { let mut res = Vec::with_capacity(fields.len()); let fields = fields.into_iter().map(|field_name| schema.get(field_name)); for field in fields { let Some(field) = field else { return Err(()) }; res.push(field.clone()) } - Ok(Schema::new(res)) + Ok(res) } fn collect_keys<'a>(values: impl Iterator) -> Result, ()> { @@ -145,12 +147,12 @@ fn collect_keys<'a>(values: impl Iterator) -> Result bool { +fn fields_mismatch(schema: &Vec>, body: &Value) -> bool { for (name, val) in body.as_object().expect("body is of object variant") { if val.is_null() { continue; } - let Some(field) = get_field(schema, name) else { return true }; + let Some(field) = get_field(&schema, name) else { return true }; if !valid_type(field.data_type(), val) { return true; } diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 6edd9efe8..101dc8fb7 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -17,6 +17,7 @@ */ use std::collections::HashMap; +use std::sync::Arc; use actix_web::http::header::ContentType; use actix_web::{HttpRequest, HttpResponse}; @@ -64,10 +65,11 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { let (size, rb, is_first_event) = { let hash_map = STREAM_INFO.read().unwrap(); - let schema = &hash_map + let schema = hash_map .get(&stream_name) .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .schema; + .schema + .clone(); into_event_batch(req, body, schema)? }; @@ -87,7 +89,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result fn into_event_batch( req: HttpRequest, body: Bytes, - schema: &HashMap, + schema: HashMap>, ) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; @@ -140,7 +142,7 @@ impl actix_web::ResponseError for PostError { #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::{collections::HashMap, sync::Arc}; use actix_web::test::TestRequest; use arrow_array::{ @@ -177,6 +179,10 @@ mod tests { } } + fn fields_to_map(iter: impl Iterator) -> HashMap> { + iter.map(|x| (x.name().clone(), Arc::new(x))).collect() + } + #[test] fn basic_object_into_rb() { let json = json!({ @@ -193,7 +199,7 @@ mod tests { let (size, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &HashMap::default(), + HashMap::default(), ) .unwrap(); @@ -239,7 +245,7 @@ mod tests { let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &HashMap::default(), + HashMap::default(), ) .unwrap(); @@ -262,20 +268,19 @@ mod tests { "b": "hello", }); - let schema = HashMap::from([ - ("a".to_string(), Field::new("a", DataType::Int64, true)), - ("b".to_string(), Field::new("b", DataType::Utf8, true)), - ("c".to_string(), Field::new("c", DataType::Float64, true)), - ]); + let schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - &schema, - ) - .unwrap(); + let (_, rb, _) = + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -296,40 +301,40 @@ mod tests { "b": 1, // type mismatch }); - let schema = HashMap::from([ - ("a".to_string(), Field::new("a", DataType::Int64, true)), - ("b".to_string(), Field::new("b", DataType::Utf8, true)), - ("c".to_string(), Field::new("c", DataType::Float64, true)), - ]); + let schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - &schema, - ) - .is_err()); + assert!( + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) + .is_err() + ); } #[test] fn empty_object() { let json = json!({}); - let schema = HashMap::from([ - ("a".to_string(), Field::new("a", DataType::Int64, true)), - ("b".to_string(), Field::new("b", DataType::Utf8, true)), - ("c".to_string(), Field::new("c", DataType::Float64, true)), - ]); + let schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - &schema, - ) - .unwrap(); + let (_, rb, _) = + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -344,7 +349,7 @@ mod tests { assert!(into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &HashMap::default(), + HashMap::default(), ) .is_err()) } @@ -374,7 +379,7 @@ mod tests { let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &HashMap::default(), + HashMap::default(), ) .unwrap(); @@ -414,19 +419,18 @@ mod tests { }, ]); - let schema = HashMap::from([ - ("a".to_string(), Field::new("a", DataType::Int64, true)), - ("b".to_string(), Field::new("b", DataType::Utf8, true)), - ("c".to_string(), Field::new("c", DataType::Float64, true)), - ]); + let schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - &schema, - ) - .unwrap(); + let (_, rb, _) = + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -469,7 +473,7 @@ mod tests { let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &HashMap::default(), + HashMap::default(), ) .unwrap(); @@ -507,18 +511,19 @@ mod tests { let req = TestRequest::default().to_http_request(); - let schema = HashMap::from([ - ("a".to_string(), Field::new("a", DataType::Int64, true)), - ("b".to_string(), Field::new("b", DataType::Utf8, true)), - ("c".to_string(), Field::new("c", DataType::Float64, true)), - ]); + let schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - &schema, - ) - .is_err()); + assert!( + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) + .is_err() + ); } #[test] @@ -549,7 +554,7 @@ mod tests { let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &HashMap::default(), + HashMap::default(), ) .unwrap(); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a6724c378..81855f5c9 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -17,7 +17,7 @@ */ use arrow_array::RecordBatch; -use arrow_schema::{Field, Schema}; +use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -40,7 +40,7 @@ pub struct StreamInfo(RwLock>); #[derive(Debug, Default)] pub struct LogStreamMetadata { - pub schema: HashMap, + pub schema: HashMap>, pub alerts: Alerts, } @@ -89,7 +89,7 @@ impl StreamInfo { // sort fields on read from hashmap as order of fields can differ. // This provides a stable output order if schema is same between calls to this function - let fields = schema + let fields: Fields = schema .values() .sorted_by_key(|field| field.name()) .cloned() @@ -133,8 +133,13 @@ impl StreamInfo { let schema = storage.get_schema(&stream.name).await?; let schema = update_schema_from_staging(&stream.name, schema); - let schema = - HashMap::from_iter(schema.fields.into_iter().map(|v| (v.name().to_owned(), v))); + let schema = HashMap::from_iter( + schema + .fields + .iter() + .map(|v| (v.name().to_owned(), v.clone())), + ); + let metadata = LogStreamMetadata { schema, alerts }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index e65dec044..632d8b10b 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -38,9 +38,10 @@ pub(super) fn v2_v3(schemas: HashMap) -> anyhow::Result { derived_schemas.push(schema); } - let mut schema = Schema::try_merge(derived_schemas)?; - schema.fields.sort_by(|a, b| a.name().cmp(b.name())); - Ok(schema) + let schema = Schema::try_merge(derived_schemas)?; + let mut fields: Vec<_> = schema.fields.iter().cloned().collect(); + fields.sort_by(|a, b| a.name().cmp(b.name())); + Ok(Schema::new(fields)) } fn value_to_schema(schema: Value) -> Result { diff --git a/server/src/response.rs b/server/src/response.rs index 02966e5d0..da44e95f8 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -42,8 +42,8 @@ impl QueryResponse { pub fn to_http(&self) -> impl Responder { log::info!("{}", "Returning query results"); - let mut json_records = record_batches_to_json_rows(&self.records).unwrap(); - + let records: Vec<&RecordBatch> = self.records.iter().collect(); + let mut json_records = record_batches_to_json_rows(&records).unwrap(); if self.fill_null { for map in &mut json_records { for field in &self.fields { diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index ce037a32c..02cd1bc03 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -222,7 +222,7 @@ impl ObjectStorage for LocalFS { let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); let listing_options = ListingOptions { file_extension: ".parquet".to_string(), - file_sort_order: None, + file_sort_order: Vec::new(), infinite_source: false, format: Arc::new(file_format), table_partition_cols: vec![], diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index c6356b32b..2e226f5c3 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -474,7 +474,7 @@ impl ObjectStorage for S3 { let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); let listing_options = ListingOptions { file_extension: ".parquet".to_string(), - file_sort_order: None, + file_sort_order: Vec::default(), infinite_source: false, format: Arc::new(file_format), table_partition_cols: vec![], diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index 812151844..cecdaae8b 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -81,6 +81,12 @@ mod tests { } } -pub fn get_field<'a>(schema: &'a Schema, name: &str) -> Option<&'a arrow_schema::Field> { - schema.fields.iter().find(|field| field.name() == name) +pub fn get_field<'a>( + fields: &'a [impl AsRef], + name: &str, +) -> Option<&'a arrow_schema::Field> { + fields + .iter() + .map(|x| x.as_ref()) + .find(|field| field.name() == name) } diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 3ce701193..102e4b7cf 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -17,7 +17,7 @@ * */ -use std::{fs::File, path::PathBuf}; +use std::{fs::File, io::BufReader, path::PathBuf}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_ipc::reader::StreamReader; @@ -29,7 +29,7 @@ use crate::event::DEFAULT_TIMESTAMP_KEY; #[derive(Debug)] pub struct MergedRecordReader { - pub readers: Vec>, + pub readers: Vec>>, } impl MergedRecordReader { From 15fd29857d7bb8e97aae3fa40bf3758ac54e8b64 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 22 Jun 2023 17:21:33 +0530 Subject: [PATCH 2/2] Clippy Fix --- server/src/event/format.rs | 4 +++- server/src/event/format/json.rs | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 1fe4d0591..e55474190 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -32,15 +32,17 @@ pub mod json; type Tags = String; type Metadata = String; +type EventSchema = Vec>; // Global Trait for event format // This trait is implemented by all the event formats pub trait EventFormat: Sized { type Data; + fn to_data( self, schema: HashMap>, - ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), AnyError>; + ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; fn into_recordbatch( self, diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 9e71c87cb..c84d599e2 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -147,12 +147,12 @@ fn collect_keys<'a>(values: impl Iterator) -> Result>, body: &Value) -> bool { +fn fields_mismatch(schema: &[Arc], body: &Value) -> bool { for (name, val) in body.as_object().expect("body is of object variant") { if val.is_null() { continue; } - let Some(field) = get_field(&schema, name) else { return true }; + let Some(field) = get_field(schema, name) else { return true }; if !valid_type(field.data_type(), val) { return true; }