From 40471b018d945c57d32e642165d25ca512d05cc1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 25 Nov 2024 12:58:25 +0530 Subject: [PATCH 01/16] refactor: restructure as a library --- .gitignore | 3 +- Cargo.lock | 130 ++++------------- Cargo.toml | 132 +++++++++++++++++- server/build.rs => build.rs | 0 server/Cargo.toml | 129 ----------------- {server/src => src}/about.rs | 0 {server/src => src}/alerts/mod.rs | 2 +- {server/src => src}/alerts/parser.rs | 0 {server/src => src}/alerts/rule.rs | 0 {server/src => src}/alerts/target.rs | 0 {server/src => src}/analytics.rs | 0 {server/src => src}/banner.rs | 0 {server/src => src}/catalog/column.rs | 0 {server/src => src}/catalog/manifest.rs | 0 server/src/catalog.rs => src/catalog/mod.rs | 4 +- {server/src => src}/catalog/snapshot.rs | 0 {server/src => src}/cli.rs | 0 {server/src => src}/event/format/json.rs | 0 .../format.rs => src/event/format/mod.rs | 0 server/src/event.rs => src/event/mod.rs | 0 .../src => src}/event/writer/file_writer.rs | 0 .../src => src}/event/writer/mem_writer.rs | 0 .../writer.rs => src/event/writer/mod.rs | 0 {server/src => src}/handlers/airplane.rs | 0 {server/src => src}/handlers/http/about.rs | 0 {server/src => src}/handlers/http/cache.rs | 0 .../src => src}/handlers/http/cluster/mod.rs | 0 .../handlers/http/cluster/utils.rs | 0 .../src => src}/handlers/http/health_check.rs | 0 {server/src => src}/handlers/http/ingest.rs | 0 {server/src => src}/handlers/http/kinesis.rs | 0 {server/src => src}/handlers/http/llm.rs | 0 .../src => src}/handlers/http/logstream.rs | 0 .../src => src}/handlers/http/middleware.rs | 0 .../http.rs => src/handlers/http/mod.rs | 0 .../http/modal/ingest/ingestor_ingest.rs | 0 .../http/modal/ingest/ingestor_logstream.rs | 0 .../http/modal/ingest/ingestor_rbac.rs | 0 .../http/modal/ingest/ingestor_role.rs | 0 .../handlers/http/modal/ingest/mod.rs | 0 .../handlers/http/modal/ingest_server.rs | 0 .../src => src}/handlers/http/modal/mod.rs | 0 .../handlers/http/modal/query/mod.rs | 0 .../http/modal/query/querier_ingest.rs | 0 .../http/modal/query/querier_logstream.rs | 0 .../handlers/http/modal/query/querier_rbac.rs | 0 .../handlers/http/modal/query/querier_role.rs | 0 .../handlers/http/modal/query_server.rs | 4 +- .../src => src}/handlers/http/modal/server.rs | 0 .../handlers/http/modal/ssl_acceptor.rs | 0 .../handlers/http/modal/utils/ingest_utils.rs | 0 .../http/modal/utils/logstream_utils.rs | 0 .../handlers/http/modal/utils/mod.rs | 0 .../handlers/http/modal/utils/rbac_utils.rs | 0 {server/src => src}/handlers/http/oidc.rs | 0 {server/src => src}/handlers/http/otel.rs | 0 .../otel/opentelemetry.proto.common.v1.rs | 0 .../http/otel/opentelemetry.proto.logs.v1.rs | 0 .../otel/opentelemetry.proto.resource.v1.rs | 0 .../http/otel/opentelemetry/proto/README.md | 0 .../proto/common/v1/common.proto | 0 .../opentelemetry/proto/logs/v1/logs.proto | 0 .../proto/resource/v1/resource.proto | 0 .../src => src}/handlers/http/otel/proto.rs | 0 {server/src => src}/handlers/http/query.rs | 0 {server/src => src}/handlers/http/rbac.rs | 0 {server/src => src}/handlers/http/role.rs | 0 {server/src => src}/handlers/http/trino.rs | 0 .../handlers/http/users/dashboards.rs | 0 .../handlers/http/users/filters.rs | 0 .../src => src}/handlers/http/users/mod.rs | 0 {server/src => src}/handlers/livetail.rs | 0 server/src/handlers.rs => src/handlers/mod.rs | 0 {server/src => src}/hottier.rs | 0 server/src/main.rs => src/lib.rs | 34 +---- {server/src => src}/livetail.rs | 0 {server/src => src}/localcache.rs | 0 src/main.rs | 42 ++++++ {server/src => src}/metadata.rs | 0 {server/src => src}/metrics/mod.rs | 0 {server/src => src}/metrics/prom_utils.rs | 0 {server/src => src}/metrics/storage.rs | 0 .../migration/metadata_migration.rs | 0 .../src/migration.rs => src/migration/mod.rs | 0 .../src => src}/migration/schema_migration.rs | 0 .../migration/stream_metadata_migration.rs | 0 {server/src => src}/oidc.rs | 0 {server/src => src}/option.rs | 0 {server/src => src}/query/filter_optimizer.rs | 0 .../query/listing_table_builder.rs | 0 server/src/query.rs => src/query/mod.rs | 0 .../query/stream_schema_provider.rs | 2 +- {server/src => src}/querycache.rs | 0 {server/src => src}/rbac/map.rs | 0 server/src/rbac.rs => src/rbac/mod.rs | 0 {server/src => src}/rbac/role.rs | 0 {server/src => src}/rbac/user.rs | 0 {server/src => src}/response.rs | 0 {server/src => src}/static_schema.rs | 0 {server/src => src}/stats.rs | 0 {server/src => src}/storage/azure_blob.rs | 0 {server/src => src}/storage/localfs.rs | 0 {server/src => src}/storage/metrics_layer.rs | 0 server/src/storage.rs => src/storage/mod.rs | 0 {server/src => src}/storage/object_storage.rs | 0 {server/src => src}/storage/retention.rs | 0 {server/src => src}/storage/s3.rs | 0 {server/src => src}/storage/staging.rs | 0 {server/src => src}/storage/store_metadata.rs | 0 {server/src => src}/sync.rs | 0 {server/src => src}/users/dashboards.rs | 0 {server/src => src}/users/filters.rs | 0 {server/src => src}/users/mod.rs | 0 {server/src => src}/utils/actix.rs | 0 .../src => src}/utils/arrow/batch_adapter.rs | 0 {server/src => src}/utils/arrow/flight.rs | 0 .../src => src}/utils/arrow/merged_reader.rs | 0 .../utils/arrow.rs => src/utils/arrow/mod.rs | 0 .../src => src}/utils/arrow/reverse_reader.rs | 0 {server/src => src}/utils/header_parsing.rs | 0 {server/src => src}/utils/json/flatten.rs | 0 .../utils/json.rs => src/utils/json/mod.rs | 0 server/src/utils.rs => src/utils/mod.rs | 0 {server/src => src}/utils/uid.rs | 0 {server/src => src}/utils/update.rs | 0 {server/src => src}/validator.rs | 0 126 files changed, 211 insertions(+), 271 deletions(-) rename server/build.rs => build.rs (100%) delete mode 100644 server/Cargo.toml rename {server/src => src}/about.rs (100%) rename {server/src => src}/alerts/mod.rs (99%) rename {server/src => src}/alerts/parser.rs (100%) rename {server/src => src}/alerts/rule.rs (100%) rename {server/src => src}/alerts/target.rs (100%) rename {server/src => src}/analytics.rs (100%) rename {server/src => src}/banner.rs (100%) rename {server/src => src}/catalog/column.rs (100%) rename {server/src => src}/catalog/manifest.rs (100%) rename server/src/catalog.rs => src/catalog/mod.rs (99%) rename {server/src => src}/catalog/snapshot.rs (100%) rename {server/src => src}/cli.rs (100%) rename {server/src => src}/event/format/json.rs (100%) rename server/src/event/format.rs => src/event/format/mod.rs (100%) rename server/src/event.rs => src/event/mod.rs (100%) rename {server/src => src}/event/writer/file_writer.rs (100%) rename {server/src => src}/event/writer/mem_writer.rs (100%) rename server/src/event/writer.rs => src/event/writer/mod.rs (100%) rename {server/src => src}/handlers/airplane.rs (100%) rename {server/src => src}/handlers/http/about.rs (100%) rename {server/src => src}/handlers/http/cache.rs (100%) rename {server/src => src}/handlers/http/cluster/mod.rs (100%) rename {server/src => src}/handlers/http/cluster/utils.rs (100%) rename {server/src => src}/handlers/http/health_check.rs (100%) rename {server/src => src}/handlers/http/ingest.rs (100%) rename {server/src => src}/handlers/http/kinesis.rs (100%) rename {server/src => src}/handlers/http/llm.rs (100%) rename {server/src => src}/handlers/http/logstream.rs (100%) rename {server/src => src}/handlers/http/middleware.rs (100%) rename server/src/handlers/http.rs => src/handlers/http/mod.rs (100%) rename {server/src => src}/handlers/http/modal/ingest/ingestor_ingest.rs (100%) rename {server/src => src}/handlers/http/modal/ingest/ingestor_logstream.rs (100%) rename {server/src => src}/handlers/http/modal/ingest/ingestor_rbac.rs (100%) rename {server/src => src}/handlers/http/modal/ingest/ingestor_role.rs (100%) rename {server/src => src}/handlers/http/modal/ingest/mod.rs (100%) rename {server/src => src}/handlers/http/modal/ingest_server.rs (100%) rename {server/src => src}/handlers/http/modal/mod.rs (100%) rename {server/src => src}/handlers/http/modal/query/mod.rs (100%) rename {server/src => src}/handlers/http/modal/query/querier_ingest.rs (100%) rename {server/src => src}/handlers/http/modal/query/querier_logstream.rs (100%) rename {server/src => src}/handlers/http/modal/query/querier_rbac.rs (100%) rename {server/src => src}/handlers/http/modal/query/querier_role.rs (100%) rename {server/src => src}/handlers/http/modal/query_server.rs (99%) rename {server/src => src}/handlers/http/modal/server.rs (100%) rename {server/src => src}/handlers/http/modal/ssl_acceptor.rs (100%) rename {server/src => src}/handlers/http/modal/utils/ingest_utils.rs (100%) rename {server/src => src}/handlers/http/modal/utils/logstream_utils.rs (100%) rename {server/src => src}/handlers/http/modal/utils/mod.rs (100%) rename {server/src => src}/handlers/http/modal/utils/rbac_utils.rs (100%) rename {server/src => src}/handlers/http/oidc.rs (100%) rename {server/src => src}/handlers/http/otel.rs (100%) rename {server/src => src}/handlers/http/otel/opentelemetry.proto.common.v1.rs (100%) rename {server/src => src}/handlers/http/otel/opentelemetry.proto.logs.v1.rs (100%) rename {server/src => src}/handlers/http/otel/opentelemetry.proto.resource.v1.rs (100%) rename {server/src => src}/handlers/http/otel/opentelemetry/proto/README.md (100%) rename {server/src => src}/handlers/http/otel/opentelemetry/proto/common/v1/common.proto (100%) rename {server/src => src}/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto (100%) rename {server/src => src}/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto (100%) rename {server/src => src}/handlers/http/otel/proto.rs (100%) rename {server/src => src}/handlers/http/query.rs (100%) rename {server/src => src}/handlers/http/rbac.rs (100%) rename {server/src => src}/handlers/http/role.rs (100%) rename {server/src => src}/handlers/http/trino.rs (100%) rename {server/src => src}/handlers/http/users/dashboards.rs (100%) rename {server/src => src}/handlers/http/users/filters.rs (100%) rename {server/src => src}/handlers/http/users/mod.rs (100%) rename {server/src => src}/handlers/livetail.rs (100%) rename server/src/handlers.rs => src/handlers/mod.rs (100%) rename {server/src => src}/hottier.rs (100%) rename server/src/main.rs => src/lib.rs (63%) rename {server/src => src}/livetail.rs (100%) rename {server/src => src}/localcache.rs (100%) create mode 100644 src/main.rs rename {server/src => src}/metadata.rs (100%) rename {server/src => src}/metrics/mod.rs (100%) rename {server/src => src}/metrics/prom_utils.rs (100%) rename {server/src => src}/metrics/storage.rs (100%) rename {server/src => src}/migration/metadata_migration.rs (100%) rename server/src/migration.rs => src/migration/mod.rs (100%) rename {server/src => src}/migration/schema_migration.rs (100%) rename {server/src => src}/migration/stream_metadata_migration.rs (100%) rename {server/src => src}/oidc.rs (100%) rename {server/src => src}/option.rs (100%) rename {server/src => src}/query/filter_optimizer.rs (100%) rename {server/src => src}/query/listing_table_builder.rs (100%) rename server/src/query.rs => src/query/mod.rs (100%) rename {server/src => src}/query/stream_schema_provider.rs (99%) rename {server/src => src}/querycache.rs (100%) rename {server/src => src}/rbac/map.rs (100%) rename server/src/rbac.rs => src/rbac/mod.rs (100%) rename {server/src => src}/rbac/role.rs (100%) rename {server/src => src}/rbac/user.rs (100%) rename {server/src => src}/response.rs (100%) rename {server/src => src}/static_schema.rs (100%) rename {server/src => src}/stats.rs (100%) rename {server/src => src}/storage/azure_blob.rs (100%) rename {server/src => src}/storage/localfs.rs (100%) rename {server/src => src}/storage/metrics_layer.rs (100%) rename server/src/storage.rs => src/storage/mod.rs (100%) rename {server/src => src}/storage/object_storage.rs (100%) rename {server/src => src}/storage/retention.rs (100%) rename {server/src => src}/storage/s3.rs (100%) rename {server/src => src}/storage/staging.rs (100%) rename {server/src => src}/storage/store_metadata.rs (100%) rename {server/src => src}/sync.rs (100%) rename {server/src => src}/users/dashboards.rs (100%) rename {server/src => src}/users/filters.rs (100%) rename {server/src => src}/users/mod.rs (100%) rename {server/src => src}/utils/actix.rs (100%) rename {server/src => src}/utils/arrow/batch_adapter.rs (100%) rename {server/src => src}/utils/arrow/flight.rs (100%) rename {server/src => src}/utils/arrow/merged_reader.rs (100%) rename server/src/utils/arrow.rs => src/utils/arrow/mod.rs (100%) rename {server/src => src}/utils/arrow/reverse_reader.rs (100%) rename {server/src => src}/utils/header_parsing.rs (100%) rename {server/src => src}/utils/json/flatten.rs (100%) rename server/src/utils/json.rs => src/utils/json/mod.rs (100%) rename server/src/utils.rs => src/utils/mod.rs (100%) rename {server/src => src}/utils/uid.rs (100%) rename {server/src => src}/utils/update.rs (100%) rename {server/src => src}/validator.rs (100%) diff --git a/.gitignore b/.gitignore index 5bbc1b194..57ea8e65e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ target data* -staging* +staging/ limitcache examples cert.pem @@ -14,4 +14,3 @@ parseable parseable_* parseable-env-secret cache - diff --git a/Cargo.lock b/Cargo.lock index 91b670830..eb66004d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,7 +672,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -1644,7 +1644,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -1829,6 +1829,29 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -2697,15 +2720,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "matchit" version = "0.7.3" @@ -2818,16 +2832,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "num" version = "0.4.2" @@ -3008,12 +3012,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "parking" version = "2.2.0" @@ -3119,6 +3117,7 @@ dependencies = [ "crossterm", "datafusion", "derive_more", + "env_logger", "fs_extra", "futures", "futures-util", @@ -3168,7 +3167,6 @@ dependencies = [ "tonic", "tonic-web", "tower-http 0.6.1", - "tracing-subscriber", "ulid", "uptime_lib", "ureq", @@ -3633,17 +3631,8 @@ checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.8", - "regex-syntax 0.8.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -3654,7 +3643,7 @@ checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -3663,12 +3652,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.5" @@ -4140,15 +4123,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sharded-slab" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" -dependencies = [ - "lazy_static", -] - [[package]] name = "shlex" version = "1.3.0" @@ -4444,16 +4418,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "thread_local" -version = "1.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" -dependencies = [ - "cfg-if", - "once_cell", -] - [[package]] name = "thrift" version = "0.17.0" @@ -4789,36 +4753,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", - "valuable", -] - -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" -dependencies = [ - "matchers", - "nu-ansi-term", - "once_cell", - "regex", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", ] [[package]] @@ -4988,12 +4922,6 @@ dependencies = [ "syn 2.0.79", ] -[[package]] -name = "valuable" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" - [[package]] name = "vergen" version = "8.3.1" diff --git a/Cargo.toml b/Cargo.toml index 156820ec5..4315f7f05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,129 @@ -[workspace] -members = ["server"] -resolver = "2" +[package] +name = "parseable" +version = "1.6.2" +authors = ["Parseable Team "] +edition = "2021" +rust-version = "1.77.1" +categories = ["logging", "observability", "log analytics"] +build = "build.rs" + +[dependencies] +### apache arrow/datafusion dependencies +# arrow = "51.0.0" +arrow-schema = { version = "53.0.0", features = ["serde"] } +arrow-array = { version = "53.0.0" } +arrow-json = "53.0.0" +arrow-ipc = { version = "53.0.0", features = ["zstd"] } +arrow-select = "53.0.0" +datafusion = "42.0.0" +object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] } +parquet = "53.0.0" +arrow-flight = { version = "53.0.0", features = [ "tls" ] } +tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } +tonic-web = "0.12.3" +tower-http = { version = "0.6.1", features = ["cors"] } + +### actix dependencies +actix-web-httpauth = "0.8" +actix-web = { version = "4.9.0", features = ["rustls-0_22"] } +actix-cors = "0.7.0" +actix-web-prometheus = { version = "0.1" } +actix-web-static-files = "4.0" +mime = "0.3.17" + +### other dependencies +anyhow = { version = "1.0", features = ["backtrace"] } +argon2 = "0.5.0" +async-trait = "0.1.82" +base64 = "0.22.0" +lazy_static = "1.4" +bytes = "1.4" +byteorder = "1.4.3" +bzip2 = { version = "*", features = ["static"] } +cookie = "0.18.1" +chrono = "0.4" +chrono-humanize = "0.2" +clap = { version = "4.1", default-features = false, features = [ + "std", + "color", + "help", + "derive", + "env", + "cargo", + "error-context", +] } +clokwerk = "0.4" +crossterm = "0.28.1" +derive_more = "0.99.18" +env_logger = "0.11.3" +fs_extra = "1.3" +futures = "0.3" +futures-util = "0.3.28" +hex = "0.4" +hostname = "0.4.0" +http = "0.2.7" +humantime-serde = "1.1" +itertools = "0.13.0" +log = "0.4" +num_cpus = "1.15" +once_cell = "1.17.1" +prometheus = { version = "0.13", features = ["process"] } +rand = "0.8.5" +regex = "1.7.3" +relative-path = { version = "1.7", features = ["serde"] } +reqwest = { version = "0.11.27", default-features = false, features = [ + "rustls-tls", + "json", +] } # cannot update cause rustls is not latest `see rustls` +rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet +rustls-pemfile = "2.1.2" +semver = "1.0" +serde = { version = "1.0", features = ["rc", "derive"] } +serde_json = "1.0" +static-files = "0.2" +sysinfo = "0.31.4" +thiserror = "1.0.64" +thread-priority = "1.0.0" +tokio = { version = "1.28", default-features = false, features = [ + "sync", + "macros", + "fs", +] } +tokio-stream = { version = "0.1", features = ["fs"] } +ulid = { version = "1.0", features = ["serde"] } +uptime_lib = "0.3.0" +xxhash-rust = { version = "0.8", features = ["xxh3"] } +xz2 = { version = "*", features = ["static"] } +nom = "7.1.3" +humantime = "2.1.0" +human-size = "0.4" +openid = { version = "0.15.0", default-features = false, features = ["rustls"] } +url = "2.4.0" +http-auth-basic = "0.3.3" +serde_repr = "0.1.17" +hashlru = { version = "0.11.0", features = ["serde"] } +path-clean = "1.0.1" +prost = "0.13.3" +prometheus-parse = "0.2.5" +sha2 = "0.10.8" + +[build-dependencies] +cargo_toml = "0.20.1" +sha1_smol = { version = "1.0", features = ["std"] } +static-files = "0.2" +ureq = "2.6" +vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] } +zip = { version = "2.2.0", default-features = false, features = ["deflate"] } +url = "2.4.0" +prost-build = "0.13.3" + +[dev-dependencies] +maplit = "1.0" +rstest = "0.23.0" + +[package.metadata.parseable_ui] +assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.11/build.zip" +assets-sha1 = "3f0c0f0e9fe23c6a01f0eb45115da4bfe29f9c3f" + +[features] +debug = [] diff --git a/server/build.rs b/build.rs similarity index 100% rename from server/build.rs rename to build.rs diff --git a/server/Cargo.toml b/server/Cargo.toml deleted file mode 100644 index 50f357452..000000000 --- a/server/Cargo.toml +++ /dev/null @@ -1,129 +0,0 @@ -[package] -name = "parseable" -version = "1.6.2" -authors = ["Parseable Team "] -edition = "2021" -rust-version = "1.77.1" -categories = ["logging", "observability", "log analytics"] -build = "build.rs" - -[dependencies] -### apache arrow/datafusion dependencies -# arrow = "51.0.0" -arrow-schema = { version = "53.0.0", features = ["serde"] } -arrow-array = { version = "53.0.0" } -arrow-json = "53.0.0" -arrow-ipc = { version = "53.0.0", features = ["zstd"] } -arrow-select = "53.0.0" -datafusion = "42.0.0" -object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] } -parquet = "53.0.0" -arrow-flight = { version = "53.0.0", features = [ "tls" ] } -tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } -tonic-web = "0.12.3" -tower-http = { version = "0.6.1", features = ["cors"] } - -### actix dependencies -actix-web-httpauth = "0.8" -actix-web = { version = "4.9.0", features = ["rustls-0_22"] } -actix-cors = "0.7.0" -actix-web-prometheus = { version = "0.1" } -actix-web-static-files = "4.0" -mime = "0.3.17" - -### other dependencies -anyhow = { version = "1.0", features = ["backtrace"] } -argon2 = "0.5.0" -async-trait = "0.1.82" -base64 = "0.22.0" -lazy_static = "1.4" -bytes = "1.4" -byteorder = "1.4.3" -bzip2 = { version = "*", features = ["static"] } -cookie = "0.18.1" -chrono = "0.4" -chrono-humanize = "0.2" -clap = { version = "4.1", default-features = false, features = [ - "std", - "color", - "help", - "derive", - "env", - "cargo", - "error-context", -] } -clokwerk = "0.4" -crossterm = "0.28.1" -derive_more = "0.99.18" -fs_extra = "1.3" -futures = "0.3" -futures-util = "0.3.28" -hex = "0.4" -hostname = "0.4.0" -http = "0.2.7" -humantime-serde = "1.1" -itertools = "0.13.0" -log = "0.4" -num_cpus = "1.15" -once_cell = "1.17.1" -prometheus = { version = "0.13", features = ["process"] } -rand = "0.8.5" -regex = "1.7.3" -relative-path = { version = "1.7", features = ["serde"] } -reqwest = { version = "0.11.27", default-features = false, features = [ - "rustls-tls", - "json", -] } # cannot update cause rustls is not latest `see rustls` -rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet -rustls-pemfile = "2.1.2" -semver = "1.0" -serde = { version = "1.0", features = ["rc", "derive"] } -serde_json = "1.0" -static-files = "0.2" -sysinfo = "0.31.4" -thiserror = "1.0.64" -thread-priority = "1.0.0" -tokio = { version = "1.28", default-features = false, features = [ - "sync", - "macros", - "fs", -] } -tokio-stream = { version = "0.1", features = ["fs"] } -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } -ulid = { version = "1.0", features = ["serde"] } -uptime_lib = "0.3.0" -xxhash-rust = { version = "0.8", features = ["xxh3"] } -xz2 = { version = "*", features = ["static"] } -nom = "7.1.3" -humantime = "2.1.0" -human-size = "0.4" -openid = { version = "0.15.0", default-features = false, features = ["rustls"] } -url = "2.4.0" -http-auth-basic = "0.3.3" -serde_repr = "0.1.17" -hashlru = { version = "0.11.0", features = ["serde"] } -path-clean = "1.0.1" -prost = "0.13.3" -prometheus-parse = "0.2.5" -sha2 = "0.10.8" - -[build-dependencies] -cargo_toml = "0.20.1" -sha1_smol = { version = "1.0", features = ["std"] } -static-files = "0.2" -ureq = "2.6" -vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] } -zip = { version = "2.2.0", default-features = false, features = ["deflate"] } -url = "2.4.0" -prost-build = "0.13.3" - -[dev-dependencies] -maplit = "1.0" -rstest = "0.23.0" - -[package.metadata.parseable_ui] -assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.11/build.zip" -assets-sha1 = "3f0c0f0e9fe23c6a01f0eb45115da4bfe29f9c3f" - -[features] -debug = [] diff --git a/server/src/about.rs b/src/about.rs similarity index 100% rename from server/src/about.rs rename to src/about.rs diff --git a/server/src/alerts/mod.rs b/src/alerts/mod.rs similarity index 99% rename from server/src/alerts/mod.rs rename to src/alerts/mod.rs index 587bad773..9523e5e1f 100644 --- a/server/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -31,9 +31,9 @@ pub mod rule; pub mod target; use crate::metrics::ALERTS_STATES; +use crate::option::CONFIG; use crate::utils::arrow::get_field; use crate::utils::uid; -use crate::CONFIG; use crate::{storage, utils}; pub use self::rule::Rule; diff --git a/server/src/alerts/parser.rs b/src/alerts/parser.rs similarity index 100% rename from server/src/alerts/parser.rs rename to src/alerts/parser.rs diff --git a/server/src/alerts/rule.rs b/src/alerts/rule.rs similarity index 100% rename from server/src/alerts/rule.rs rename to src/alerts/rule.rs diff --git a/server/src/alerts/target.rs b/src/alerts/target.rs similarity index 100% rename from server/src/alerts/target.rs rename to src/alerts/target.rs diff --git a/server/src/analytics.rs b/src/analytics.rs similarity index 100% rename from server/src/analytics.rs rename to src/analytics.rs diff --git a/server/src/banner.rs b/src/banner.rs similarity index 100% rename from server/src/banner.rs rename to src/banner.rs diff --git a/server/src/catalog/column.rs b/src/catalog/column.rs similarity index 100% rename from server/src/catalog/column.rs rename to src/catalog/column.rs diff --git a/server/src/catalog/manifest.rs b/src/catalog/manifest.rs similarity index 100% rename from server/src/catalog/manifest.rs rename to src/catalog/manifest.rs diff --git a/server/src/catalog.rs b/src/catalog/mod.rs similarity index 99% rename from server/src/catalog.rs rename to src/catalog/mod.rs index e93f6cdd4..19a557647 100644 --- a/server/src/catalog.rs +++ b/src/catalog/mod.rs @@ -19,10 +19,11 @@ use std::{io::ErrorKind, sync::Arc}; use self::{column::Column, snapshot::ManifestItem}; +use crate::handlers; use crate::handlers::http::base_path_without_preceding_slash; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::option::CONFIG; +use crate::option::{Mode, CONFIG}; use crate::stats::{ event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats, }; @@ -32,7 +33,6 @@ use crate::{ query::PartialTimeFilter, storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError}, }; -use crate::{handlers, Mode}; use bytes::Bytes; use chrono::{DateTime, Local, NaiveTime, Utc}; use relative_path::RelativePathBuf; diff --git a/server/src/catalog/snapshot.rs b/src/catalog/snapshot.rs similarity index 100% rename from server/src/catalog/snapshot.rs rename to src/catalog/snapshot.rs diff --git a/server/src/cli.rs b/src/cli.rs similarity index 100% rename from server/src/cli.rs rename to src/cli.rs diff --git a/server/src/event/format/json.rs b/src/event/format/json.rs similarity index 100% rename from server/src/event/format/json.rs rename to src/event/format/json.rs diff --git a/server/src/event/format.rs b/src/event/format/mod.rs similarity index 100% rename from server/src/event/format.rs rename to src/event/format/mod.rs diff --git a/server/src/event.rs b/src/event/mod.rs similarity index 100% rename from server/src/event.rs rename to src/event/mod.rs diff --git a/server/src/event/writer/file_writer.rs b/src/event/writer/file_writer.rs similarity index 100% rename from server/src/event/writer/file_writer.rs rename to src/event/writer/file_writer.rs diff --git a/server/src/event/writer/mem_writer.rs b/src/event/writer/mem_writer.rs similarity index 100% rename from server/src/event/writer/mem_writer.rs rename to src/event/writer/mem_writer.rs diff --git a/server/src/event/writer.rs b/src/event/writer/mod.rs similarity index 100% rename from server/src/event/writer.rs rename to src/event/writer/mod.rs diff --git a/server/src/handlers/airplane.rs b/src/handlers/airplane.rs similarity index 100% rename from server/src/handlers/airplane.rs rename to src/handlers/airplane.rs diff --git a/server/src/handlers/http/about.rs b/src/handlers/http/about.rs similarity index 100% rename from server/src/handlers/http/about.rs rename to src/handlers/http/about.rs diff --git a/server/src/handlers/http/cache.rs b/src/handlers/http/cache.rs similarity index 100% rename from server/src/handlers/http/cache.rs rename to src/handlers/http/cache.rs diff --git a/server/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs similarity index 100% rename from server/src/handlers/http/cluster/mod.rs rename to src/handlers/http/cluster/mod.rs diff --git a/server/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs similarity index 100% rename from server/src/handlers/http/cluster/utils.rs rename to src/handlers/http/cluster/utils.rs diff --git a/server/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs similarity index 100% rename from server/src/handlers/http/health_check.rs rename to src/handlers/http/health_check.rs diff --git a/server/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs similarity index 100% rename from server/src/handlers/http/ingest.rs rename to src/handlers/http/ingest.rs diff --git a/server/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs similarity index 100% rename from server/src/handlers/http/kinesis.rs rename to src/handlers/http/kinesis.rs diff --git a/server/src/handlers/http/llm.rs b/src/handlers/http/llm.rs similarity index 100% rename from server/src/handlers/http/llm.rs rename to src/handlers/http/llm.rs diff --git a/server/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs similarity index 100% rename from server/src/handlers/http/logstream.rs rename to src/handlers/http/logstream.rs diff --git a/server/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs similarity index 100% rename from server/src/handlers/http/middleware.rs rename to src/handlers/http/middleware.rs diff --git a/server/src/handlers/http.rs b/src/handlers/http/mod.rs similarity index 100% rename from server/src/handlers/http.rs rename to src/handlers/http/mod.rs diff --git a/server/src/handlers/http/modal/ingest/ingestor_ingest.rs b/src/handlers/http/modal/ingest/ingestor_ingest.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/ingestor_ingest.rs rename to src/handlers/http/modal/ingest/ingestor_ingest.rs diff --git a/server/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/ingestor_logstream.rs rename to src/handlers/http/modal/ingest/ingestor_logstream.rs diff --git a/server/src/handlers/http/modal/ingest/ingestor_rbac.rs b/src/handlers/http/modal/ingest/ingestor_rbac.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/ingestor_rbac.rs rename to src/handlers/http/modal/ingest/ingestor_rbac.rs diff --git a/server/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/ingestor_role.rs rename to src/handlers/http/modal/ingest/ingestor_role.rs diff --git a/server/src/handlers/http/modal/ingest/mod.rs b/src/handlers/http/modal/ingest/mod.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/mod.rs rename to src/handlers/http/modal/ingest/mod.rs diff --git a/server/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs similarity index 100% rename from server/src/handlers/http/modal/ingest_server.rs rename to src/handlers/http/modal/ingest_server.rs diff --git a/server/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs similarity index 100% rename from server/src/handlers/http/modal/mod.rs rename to src/handlers/http/modal/mod.rs diff --git a/server/src/handlers/http/modal/query/mod.rs b/src/handlers/http/modal/query/mod.rs similarity index 100% rename from server/src/handlers/http/modal/query/mod.rs rename to src/handlers/http/modal/query/mod.rs diff --git a/server/src/handlers/http/modal/query/querier_ingest.rs b/src/handlers/http/modal/query/querier_ingest.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_ingest.rs rename to src/handlers/http/modal/query/querier_ingest.rs diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_logstream.rs rename to src/handlers/http/modal/query/querier_logstream.rs diff --git a/server/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_rbac.rs rename to src/handlers/http/modal/query/querier_rbac.rs diff --git a/server/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_role.rs rename to src/handlers/http/modal/query/querier_role.rs diff --git a/server/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs similarity index 99% rename from server/src/handlers/http/modal/query_server.rs rename to src/handlers/http/modal/query_server.rs index 302bd977e..2b056d3e0 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -37,12 +37,12 @@ use async_trait::async_trait; use std::sync::Arc; use tokio::sync::{oneshot, Mutex}; -use crate::option::CONFIG; +use crate::{option::CONFIG, ParseableServer}; use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; -use super::{OpenIdClient, ParseableServer}; +use super::OpenIdClient; #[derive(Default, Debug)] pub struct QueryServer; diff --git a/server/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs similarity index 100% rename from server/src/handlers/http/modal/server.rs rename to src/handlers/http/modal/server.rs diff --git a/server/src/handlers/http/modal/ssl_acceptor.rs b/src/handlers/http/modal/ssl_acceptor.rs similarity index 100% rename from server/src/handlers/http/modal/ssl_acceptor.rs rename to src/handlers/http/modal/ssl_acceptor.rs diff --git a/server/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs similarity index 100% rename from server/src/handlers/http/modal/utils/ingest_utils.rs rename to src/handlers/http/modal/utils/ingest_utils.rs diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs similarity index 100% rename from server/src/handlers/http/modal/utils/logstream_utils.rs rename to src/handlers/http/modal/utils/logstream_utils.rs diff --git a/server/src/handlers/http/modal/utils/mod.rs b/src/handlers/http/modal/utils/mod.rs similarity index 100% rename from server/src/handlers/http/modal/utils/mod.rs rename to src/handlers/http/modal/utils/mod.rs diff --git a/server/src/handlers/http/modal/utils/rbac_utils.rs b/src/handlers/http/modal/utils/rbac_utils.rs similarity index 100% rename from server/src/handlers/http/modal/utils/rbac_utils.rs rename to src/handlers/http/modal/utils/rbac_utils.rs diff --git a/server/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs similarity index 100% rename from server/src/handlers/http/oidc.rs rename to src/handlers/http/oidc.rs diff --git a/server/src/handlers/http/otel.rs b/src/handlers/http/otel.rs similarity index 100% rename from server/src/handlers/http/otel.rs rename to src/handlers/http/otel.rs diff --git a/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs b/src/handlers/http/otel/opentelemetry.proto.common.v1.rs similarity index 100% rename from server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs rename to src/handlers/http/otel/opentelemetry.proto.common.v1.rs diff --git a/server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs b/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs similarity index 100% rename from server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs rename to src/handlers/http/otel/opentelemetry.proto.logs.v1.rs diff --git a/server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs b/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs similarity index 100% rename from server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs rename to src/handlers/http/otel/opentelemetry.proto.resource.v1.rs diff --git a/server/src/handlers/http/otel/opentelemetry/proto/README.md b/src/handlers/http/otel/opentelemetry/proto/README.md similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/README.md rename to src/handlers/http/otel/opentelemetry/proto/README.md diff --git a/server/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto b/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto rename to src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto diff --git a/server/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto b/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto rename to src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto diff --git a/server/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto b/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto rename to src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto diff --git a/server/src/handlers/http/otel/proto.rs b/src/handlers/http/otel/proto.rs similarity index 100% rename from server/src/handlers/http/otel/proto.rs rename to src/handlers/http/otel/proto.rs diff --git a/server/src/handlers/http/query.rs b/src/handlers/http/query.rs similarity index 100% rename from server/src/handlers/http/query.rs rename to src/handlers/http/query.rs diff --git a/server/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs similarity index 100% rename from server/src/handlers/http/rbac.rs rename to src/handlers/http/rbac.rs diff --git a/server/src/handlers/http/role.rs b/src/handlers/http/role.rs similarity index 100% rename from server/src/handlers/http/role.rs rename to src/handlers/http/role.rs diff --git a/server/src/handlers/http/trino.rs b/src/handlers/http/trino.rs similarity index 100% rename from server/src/handlers/http/trino.rs rename to src/handlers/http/trino.rs diff --git a/server/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs similarity index 100% rename from server/src/handlers/http/users/dashboards.rs rename to src/handlers/http/users/dashboards.rs diff --git a/server/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs similarity index 100% rename from server/src/handlers/http/users/filters.rs rename to src/handlers/http/users/filters.rs diff --git a/server/src/handlers/http/users/mod.rs b/src/handlers/http/users/mod.rs similarity index 100% rename from server/src/handlers/http/users/mod.rs rename to src/handlers/http/users/mod.rs diff --git a/server/src/handlers/livetail.rs b/src/handlers/livetail.rs similarity index 100% rename from server/src/handlers/livetail.rs rename to src/handlers/livetail.rs diff --git a/server/src/handlers.rs b/src/handlers/mod.rs similarity index 100% rename from server/src/handlers.rs rename to src/handlers/mod.rs diff --git a/server/src/hottier.rs b/src/hottier.rs similarity index 100% rename from server/src/hottier.rs rename to src/hottier.rs diff --git a/server/src/main.rs b/src/lib.rs similarity index 63% rename from server/src/main.rs rename to src/lib.rs index 1a8562160..857542351 100644 --- a/server/src/main.rs +++ b/src/lib.rs @@ -31,7 +31,7 @@ mod metadata; mod metrics; mod migration; mod oidc; -mod option; +pub mod option; mod query; mod querycache; mod rbac; @@ -44,34 +44,8 @@ mod users; mod utils; mod validator; -use std::sync::Arc; - -use handlers::http::modal::ParseableServer; -use option::{Mode, CONFIG}; -use tracing_subscriber::EnvFilter; - -use crate::handlers::http::modal::{ - ingest_server::IngestServer, query_server::QueryServer, server::Server, +pub use handlers::http::modal::{ + ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer, }; -pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; - -#[actix_web::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .init(); - - // these are empty ptrs so mem footprint should be minimal - let server: Arc = match CONFIG.parseable.mode { - Mode::Query => Arc::new(QueryServer), - Mode::Ingest => Arc::new(IngestServer), - - Mode::All => Arc::new(Server), - }; - - server.init().await?; - - Ok(()) -} +pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; diff --git a/server/src/livetail.rs b/src/livetail.rs similarity index 100% rename from server/src/livetail.rs rename to src/livetail.rs diff --git a/server/src/localcache.rs b/src/localcache.rs similarity index 100% rename from server/src/localcache.rs rename to src/localcache.rs diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 000000000..b689533e6 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,42 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::sync::Arc; + +use parseable::{ + option::{Mode, CONFIG}, + IngestServer, ParseableServer, QueryServer, Server, +}; + +#[actix_web::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // these are empty ptrs so mem footprint should be minimal + let server: Arc = match CONFIG.parseable.mode { + Mode::Query => Arc::new(QueryServer), + + Mode::Ingest => Arc::new(IngestServer), + + Mode::All => Arc::new(Server), + }; + + server.init().await?; + + Ok(()) +} diff --git a/server/src/metadata.rs b/src/metadata.rs similarity index 100% rename from server/src/metadata.rs rename to src/metadata.rs diff --git a/server/src/metrics/mod.rs b/src/metrics/mod.rs similarity index 100% rename from server/src/metrics/mod.rs rename to src/metrics/mod.rs diff --git a/server/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs similarity index 100% rename from server/src/metrics/prom_utils.rs rename to src/metrics/prom_utils.rs diff --git a/server/src/metrics/storage.rs b/src/metrics/storage.rs similarity index 100% rename from server/src/metrics/storage.rs rename to src/metrics/storage.rs diff --git a/server/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs similarity index 100% rename from server/src/migration/metadata_migration.rs rename to src/migration/metadata_migration.rs diff --git a/server/src/migration.rs b/src/migration/mod.rs similarity index 100% rename from server/src/migration.rs rename to src/migration/mod.rs diff --git a/server/src/migration/schema_migration.rs b/src/migration/schema_migration.rs similarity index 100% rename from server/src/migration/schema_migration.rs rename to src/migration/schema_migration.rs diff --git a/server/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs similarity index 100% rename from server/src/migration/stream_metadata_migration.rs rename to src/migration/stream_metadata_migration.rs diff --git a/server/src/oidc.rs b/src/oidc.rs similarity index 100% rename from server/src/oidc.rs rename to src/oidc.rs diff --git a/server/src/option.rs b/src/option.rs similarity index 100% rename from server/src/option.rs rename to src/option.rs diff --git a/server/src/query/filter_optimizer.rs b/src/query/filter_optimizer.rs similarity index 100% rename from server/src/query/filter_optimizer.rs rename to src/query/filter_optimizer.rs diff --git a/server/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs similarity index 100% rename from server/src/query/listing_table_builder.rs rename to src/query/listing_table_builder.rs diff --git a/server/src/query.rs b/src/query/mod.rs similarity index 100% rename from server/src/query.rs rename to src/query/mod.rs diff --git a/server/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs similarity index 99% rename from server/src/query/stream_schema_provider.rs rename to src/query/stream_schema_provider.rs index 6f1ceecf4..d5ccea485 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -18,7 +18,7 @@ use crate::catalog::manifest::File; use crate::hottier::HotTierManager; -use crate::Mode; +use crate::option::Mode; use crate::{ catalog::snapshot::{self, Snapshot}, storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, diff --git a/server/src/querycache.rs b/src/querycache.rs similarity index 100% rename from server/src/querycache.rs rename to src/querycache.rs diff --git a/server/src/rbac/map.rs b/src/rbac/map.rs similarity index 100% rename from server/src/rbac/map.rs rename to src/rbac/map.rs diff --git a/server/src/rbac.rs b/src/rbac/mod.rs similarity index 100% rename from server/src/rbac.rs rename to src/rbac/mod.rs diff --git a/server/src/rbac/role.rs b/src/rbac/role.rs similarity index 100% rename from server/src/rbac/role.rs rename to src/rbac/role.rs diff --git a/server/src/rbac/user.rs b/src/rbac/user.rs similarity index 100% rename from server/src/rbac/user.rs rename to src/rbac/user.rs diff --git a/server/src/response.rs b/src/response.rs similarity index 100% rename from server/src/response.rs rename to src/response.rs diff --git a/server/src/static_schema.rs b/src/static_schema.rs similarity index 100% rename from server/src/static_schema.rs rename to src/static_schema.rs diff --git a/server/src/stats.rs b/src/stats.rs similarity index 100% rename from server/src/stats.rs rename to src/stats.rs diff --git a/server/src/storage/azure_blob.rs b/src/storage/azure_blob.rs similarity index 100% rename from server/src/storage/azure_blob.rs rename to src/storage/azure_blob.rs diff --git a/server/src/storage/localfs.rs b/src/storage/localfs.rs similarity index 100% rename from server/src/storage/localfs.rs rename to src/storage/localfs.rs diff --git a/server/src/storage/metrics_layer.rs b/src/storage/metrics_layer.rs similarity index 100% rename from server/src/storage/metrics_layer.rs rename to src/storage/metrics_layer.rs diff --git a/server/src/storage.rs b/src/storage/mod.rs similarity index 100% rename from server/src/storage.rs rename to src/storage/mod.rs diff --git a/server/src/storage/object_storage.rs b/src/storage/object_storage.rs similarity index 100% rename from server/src/storage/object_storage.rs rename to src/storage/object_storage.rs diff --git a/server/src/storage/retention.rs b/src/storage/retention.rs similarity index 100% rename from server/src/storage/retention.rs rename to src/storage/retention.rs diff --git a/server/src/storage/s3.rs b/src/storage/s3.rs similarity index 100% rename from server/src/storage/s3.rs rename to src/storage/s3.rs diff --git a/server/src/storage/staging.rs b/src/storage/staging.rs similarity index 100% rename from server/src/storage/staging.rs rename to src/storage/staging.rs diff --git a/server/src/storage/store_metadata.rs b/src/storage/store_metadata.rs similarity index 100% rename from server/src/storage/store_metadata.rs rename to src/storage/store_metadata.rs diff --git a/server/src/sync.rs b/src/sync.rs similarity index 100% rename from server/src/sync.rs rename to src/sync.rs diff --git a/server/src/users/dashboards.rs b/src/users/dashboards.rs similarity index 100% rename from server/src/users/dashboards.rs rename to src/users/dashboards.rs diff --git a/server/src/users/filters.rs b/src/users/filters.rs similarity index 100% rename from server/src/users/filters.rs rename to src/users/filters.rs diff --git a/server/src/users/mod.rs b/src/users/mod.rs similarity index 100% rename from server/src/users/mod.rs rename to src/users/mod.rs diff --git a/server/src/utils/actix.rs b/src/utils/actix.rs similarity index 100% rename from server/src/utils/actix.rs rename to src/utils/actix.rs diff --git a/server/src/utils/arrow/batch_adapter.rs b/src/utils/arrow/batch_adapter.rs similarity index 100% rename from server/src/utils/arrow/batch_adapter.rs rename to src/utils/arrow/batch_adapter.rs diff --git a/server/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs similarity index 100% rename from server/src/utils/arrow/flight.rs rename to src/utils/arrow/flight.rs diff --git a/server/src/utils/arrow/merged_reader.rs b/src/utils/arrow/merged_reader.rs similarity index 100% rename from server/src/utils/arrow/merged_reader.rs rename to src/utils/arrow/merged_reader.rs diff --git a/server/src/utils/arrow.rs b/src/utils/arrow/mod.rs similarity index 100% rename from server/src/utils/arrow.rs rename to src/utils/arrow/mod.rs diff --git a/server/src/utils/arrow/reverse_reader.rs b/src/utils/arrow/reverse_reader.rs similarity index 100% rename from server/src/utils/arrow/reverse_reader.rs rename to src/utils/arrow/reverse_reader.rs diff --git a/server/src/utils/header_parsing.rs b/src/utils/header_parsing.rs similarity index 100% rename from server/src/utils/header_parsing.rs rename to src/utils/header_parsing.rs diff --git a/server/src/utils/json/flatten.rs b/src/utils/json/flatten.rs similarity index 100% rename from server/src/utils/json/flatten.rs rename to src/utils/json/flatten.rs diff --git a/server/src/utils/json.rs b/src/utils/json/mod.rs similarity index 100% rename from server/src/utils/json.rs rename to src/utils/json/mod.rs diff --git a/server/src/utils.rs b/src/utils/mod.rs similarity index 100% rename from server/src/utils.rs rename to src/utils/mod.rs diff --git a/server/src/utils/uid.rs b/src/utils/uid.rs similarity index 100% rename from server/src/utils/uid.rs rename to src/utils/uid.rs diff --git a/server/src/utils/update.rs b/src/utils/update.rs similarity index 100% rename from server/src/utils/update.rs rename to src/utils/update.rs diff --git a/server/src/validator.rs b/src/validator.rs similarity index 100% rename from server/src/validator.rs rename to src/validator.rs From 6c8504fcde65dd11b7dd6f82db6361e5fe99cee6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 25 Nov 2024 13:08:59 +0530 Subject: [PATCH 02/16] ci: implement suggestion --- src/storage/store_metadata.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 54735ab71..d49cbd87f 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -65,8 +65,8 @@ pub struct StorageMetadata { pub default_role: Option, } -impl StorageMetadata { - pub fn new() -> Self { +impl Default for StorageMetadata { + fn default() -> Self { Self { version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: CONFIG.storage_name.to_owned(), @@ -80,6 +80,9 @@ impl StorageMetadata { default_role: None, } } +} + +impl StorageMetadata { pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get() @@ -169,7 +172,7 @@ pub async fn resolve_parseable_metadata( } EnvChange::CreateBoth => { create_dir_all(CONFIG.staging_dir())?; - let metadata = StorageMetadata::new(); + let metadata = StorageMetadata::default(); // new metadata needs to be set // if mode is query or all then both staging and remote match CONFIG.parseable.mode { From 5d08df61eee6a82bc4ea84ada7f4422a6cc4e995 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 25 Nov 2024 13:29:06 +0530 Subject: [PATCH 03/16] doc: fix code run in test --- Cargo.lock | 1 + Cargo.toml | 34 +++++++++++++++++----------------- src/utils/arrow/mod.rs | 6 ++++-- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb66004d6..90aa480ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3097,6 +3097,7 @@ dependencies = [ "actix-web-static-files", "anyhow", "argon2", + "arrow", "arrow-array", "arrow-flight", "arrow-ipc", diff --git a/Cargo.toml b/Cargo.toml index 4315f7f05..684c560fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ build = "build.rs" [dependencies] ### apache arrow/datafusion dependencies -# arrow = "51.0.0" arrow-schema = { version = "53.0.0", features = ["serde"] } arrow-array = { version = "53.0.0" } arrow-json = "53.0.0" @@ -18,8 +17,8 @@ arrow-select = "53.0.0" datafusion = "42.0.0" object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] } parquet = "53.0.0" -arrow-flight = { version = "53.0.0", features = [ "tls" ] } -tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } +arrow-flight = { version = "53.0.0", features = ["tls"] } +tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } tonic-web = "0.12.3" tower-http = { version = "0.6.1", features = ["cors"] } @@ -44,13 +43,13 @@ cookie = "0.18.1" chrono = "0.4" chrono-humanize = "0.2" clap = { version = "4.1", default-features = false, features = [ - "std", - "color", - "help", - "derive", - "env", - "cargo", - "error-context", + "std", + "color", + "help", + "derive", + "env", + "cargo", + "error-context", ] } clokwerk = "0.4" crossterm = "0.28.1" @@ -72,10 +71,10 @@ rand = "0.8.5" regex = "1.7.3" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ - "rustls-tls", - "json", -] } # cannot update cause rustls is not latest `see rustls` -rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet + "rustls-tls", + "json", +] } # cannot update cause rustls is not latest `see rustls` +rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet rustls-pemfile = "2.1.2" semver = "1.0" serde = { version = "1.0", features = ["rc", "derive"] } @@ -85,9 +84,9 @@ sysinfo = "0.31.4" thiserror = "1.0.64" thread-priority = "1.0.0" tokio = { version = "1.28", default-features = false, features = [ - "sync", - "macros", - "fs", + "sync", + "macros", + "fs", ] } tokio-stream = { version = "0.1", features = ["fs"] } ulid = { version = "1.0", features = ["serde"] } @@ -120,6 +119,7 @@ prost-build = "0.13.3" [dev-dependencies] maplit = "1.0" rstest = "0.23.0" +arrow = "53.0.0" [package.metadata.parseable_ui] assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.11/build.zip" diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index f88be6048..87af65735 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -34,8 +34,9 @@ pub use merged_reader::MergedRecordReader; use serde_json::{Map, Value}; /// example function for concat recordbatch(may not work) -/// use arrow::record_batch::RecordBatch; -/// use arrow::error::Result; +/// ```rust +/// # use arrow::record_batch::RecordBatch; +/// # use arrow::error::Result; /// /// fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result { /// let schema = batch1.schema(); @@ -53,6 +54,7 @@ use serde_json::{Map, Value}; /// /// RecordBatch::try_new(schema.clone(), columns) /// } +/// ``` /// /// Replaces columns in a record batch with new arrays. From e3fa62977ad852ec35ebaa45ceff5a0718e06c34 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 2 Nov 2024 21:34:11 +0530 Subject: [PATCH 04/16] refactor: DRY `ParseableServer::start()` --- src/handlers/http/modal/ingest_server.rs | 169 +++++------------------ src/handlers/http/modal/mod.rs | 126 ++++++++++++++++- src/handlers/http/modal/query_server.rs | 158 ++++----------------- src/handlers/http/modal/server.rs | 154 ++++----------------- 4 files changed, 204 insertions(+), 403 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 1e0e9dd21..2cc0e4631 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -15,10 +15,17 @@ * along with this program. If not, see . * */ + +use super::ingest::ingestor_logstream; +use super::ingest::ingestor_rbac; +use super::ingest::ingestor_role; +use super::server::Server; +use super::IngestorMetadata; +use super::OpenIdClient; +use super::ParseableServer; use crate::analytics; use crate::banner; use crate::handlers::airplane; -use crate::handlers::http::health_check; use crate::handlers::http::ingest; use crate::handlers::http::logstream; use crate::handlers::http::middleware::DisAllowRootUser; @@ -38,27 +45,11 @@ use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::sync; -use std::sync::Arc; - -use super::ingest::ingestor_logstream; -use super::ingest::ingestor_rbac; -use super::ingest::ingestor_role; -use super::server::Server; -use super::ssl_acceptor::get_ssl_acceptor; -use super::IngestorMetadata; -use super::OpenIdClient; -use super::ParseableServer; - -use crate::{ - handlers::http::{base_path, cross_origin_config}, - option::CONFIG, -}; +use crate::{handlers::http::base_path, option::CONFIG}; use actix_web::body::MessageBody; -use actix_web::middleware::from_fn; +use actix_web::web; use actix_web::web::resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer}; -use actix_web_prometheus::PrometheusMetrics; use anyhow::anyhow; use async_trait::async_trait; use base64::Engine; @@ -66,110 +57,32 @@ use bytes::Bytes; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; -use tokio::sync::{oneshot, Mutex}; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json")); -#[derive(Default)] pub struct IngestServer; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for IngestServer { - // we dont need oidc client here its just here to satisfy the trait - async fn start( - &self, - prometheus: PrometheusMetrics, - _oidc_client: Option, - ) -> anyhow::Result<()> { - // set the ingestor metadata - self.set_ingestor_metadata().await?; - - // get the ssl stuff - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - // fn that creates the app - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| IngestServer::configure_routes(config, None)) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(60); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) + // configure the api routes + fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option) { + config + .service( + // Base path "{url}/api/v1" + web::scope(&base_path()) + .service(Server::get_ingest_factory()) + .service(Self::logstream_api()) + .service(Server::get_about_factory()) + .service(Self::analytics_factory()) + .service(Server::get_liveness_factory()) + .service(Self::get_user_webscope()) + .service(Self::get_user_role_webscope()) + .service(Server::get_metrics_webscope()) + .service(Server::get_readiness_factory()), + ) + .service(Server::get_ingest_otel_factory()); } /// implement the init method will just invoke the initialize method @@ -202,25 +115,6 @@ impl ParseableServer for IngestServer { } impl IngestServer { - // configure the api routes - fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option) { - config - .service( - // Base path "{url}/api/v1" - web::scope(&base_path()) - .service(Server::get_ingest_factory()) - .service(Self::logstream_api()) - .service(Server::get_about_factory()) - .service(Self::analytics_factory()) - .service(Server::get_liveness_factory()) - .service(Self::get_user_webscope()) - .service(Self::get_user_role_webscope()) - .service(Server::get_metrics_webscope()) - .service(Server::get_readiness_factory()), - ) - .service(Server::get_ingest_otel_factory()); - } - fn analytics_factory() -> Scope { web::scope("/analytics").service( // GET "/analytics" ==> Get analytics data @@ -480,7 +374,12 @@ impl IngestServer { tokio::spawn(airplane::server()); - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + // set the ingestor metadata + self.set_ingestor_metadata().await?; + + // TODO: figure out why we don't need this + // > we dont need oidc client here its just here to satisfy the trait + let app = self.start(prometheus, None); tokio::pin!(app); loop { diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 6f6d2bfd7..6ff7f71f5 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -26,14 +26,26 @@ pub mod utils; use std::sync::Arc; +use actix_web::middleware::from_fn; +use actix_web::web::ServiceConfig; +use actix_web::App; +use actix_web::HttpServer; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; -use openid::Discovered; - -use crate::oidc; use base64::Engine; +use openid::Discovered; use serde::Deserialize; use serde::Serialize; +use ssl_acceptor::get_ssl_acceptor; +use tokio::sync::{oneshot, Mutex}; + +use super::cross_origin_config; +use super::API_BASE_PATH; +use super::API_VERSION; +use crate::handlers::http::health_check; +use crate::oidc; +use crate::option::CONFIG; + pub type OpenIdClient = Arc>; // to be decided on what the Default version should be @@ -41,16 +53,118 @@ pub const DEFAULT_VERSION: &str = "v3"; include!(concat!(env!("OUT_DIR"), "/generated.rs")); -#[async_trait(?Send)] +#[async_trait] pub trait ParseableServer { - // async fn validate(&self) -> Result<(), ObjectStorageError>; + /// configure the router + fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) + where + Self: Sized; /// configure the server async fn start( &self, prometheus: PrometheusMetrics, oidc_client: Option, - ) -> anyhow::Result<()>; + ) -> anyhow::Result<()> + where + Self: Sized, + { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + + None => None, + }; + + // get the ssl stuff + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + &CONFIG.parseable.trusted_ca_certs_path, + )?; + + // fn that creates the app + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|config| Self::configure_routes(config, oidc_client.clone())) + .wrap(from_fn(health_check::check_shutdown_middleware)) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + + // Spawn the signal handler task + let signal_task = tokio::spawn(async move { + health_check::handle_signals(shutdown_signal).await; + println!("Received shutdown signal, notifying server to shut down..."); + }); + + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { + http_server + .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .run() + } else { + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + let sync_task = tokio::spawn(async move { + // Wait for the shutdown signal + let _ = shutdown_rx.await; + + // Perform S3 sync and wait for completion + log::info!("Starting data sync to S3..."); + if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } else { + log::info!("Successfully synced all data to S3."); + } + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the HTTP server to run + let server_result = srv.await; + + // Await the signal handler to ensure proper cleanup + if let Err(e) = signal_task.await { + log::error!("Error in signal handler: {:?}", e); + } + + // Wait for the sync task to complete before exiting + if let Err(e) = sync_task.await { + log::error!("Error in sync task: {:?}", e); + } else { + log::info!("Sync task completed successfully."); + } + + // Return the result of the server + server_result?; + + Ok(()) + } async fn init(&self) -> anyhow::Result<()>; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 2b056d3e0..777d7fb4c 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -17,136 +17,55 @@ */ use crate::handlers::airplane; +use crate::handlers::http::base_path; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::{self, role}; -use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; -use crate::handlers::http::{health_check, logstream, MAX_EVENT_PAYLOAD_SIZE}; +use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use crate::{analytics, banner, metrics, migration, rbac, storage}; -use actix_web::middleware::from_fn; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; -use actix_web::{App, HttpServer}; use async_trait::async_trait; -use std::sync::Arc; -use tokio::sync::{oneshot, Mutex}; use crate::{option::CONFIG, ParseableServer}; use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; use super::server::Server; -use super::ssl_acceptor::get_ssl_acceptor; use super::OpenIdClient; -#[derive(Default, Debug)] pub struct QueryServer; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for QueryServer { - async fn start( - &self, - prometheus: actix_web_prometheus::PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - - None => None, - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(120); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) + // configure the api routes + fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { + config + .service( + web::scope(&base_path()) + // POST "/query" ==> Get results of the SQL query passed in request body + .service(Server::get_query_factory()) + .service(Server::get_trino_factory()) + .service(Server::get_cache_webscope()) + .service(Server::get_liveness_factory()) + .service(Server::get_readiness_factory()) + .service(Server::get_about_factory()) + .service(Self::get_logstream_webscope()) + .service(Self::get_user_webscope()) + .service(Server::get_dashboards_webscope()) + .service(Server::get_filters_webscope()) + .service(Server::get_llm_webscope()) + .service(Server::get_oauth_webscope(oidc_client)) + .service(Self::get_user_role_webscope()) + .service(Server::get_metrics_webscope()) + .service(Self::get_cluster_web_scope()), + ) + .service(Server::get_generated()); } /// implementation of init should just invoke a call to initialize @@ -176,31 +95,6 @@ impl ParseableServer for QueryServer { } impl QueryServer { - // configure the api routes - fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { - config - .service( - web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body - .service(Server::get_query_factory()) - .service(Server::get_trino_factory()) - .service(Server::get_cache_webscope()) - .service(Server::get_liveness_factory()) - .service(Server::get_readiness_factory()) - .service(Server::get_about_factory()) - .service(Self::get_logstream_webscope()) - .service(Self::get_user_webscope()) - .service(Server::get_dashboards_webscope()) - .service(Server::get_filters_webscope()) - .service(Server::get_llm_webscope()) - .service(Server::get_oauth_webscope(oidc_client)) - .service(Self::get_user_role_webscope()) - .service(Server::get_metrics_webscope()) - .service(Self::get_cluster_web_scope()), - ) - .service(Server::get_generated()); - } - // get the role webscope fn get_user_role_webscope() -> Scope { web::scope("/role") diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index ba1ab055c..d5e716a90 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -27,8 +27,6 @@ use crate::handlers::http::query; use crate::handlers::http::trino; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; -use crate::handlers::http::API_BASE_PATH; -use crate::handlers::http::API_VERSION; use crate::hottier::HotTierManager; use crate::localcache::LocalCacheManager; use crate::metrics; @@ -38,21 +36,17 @@ use crate::storage; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use actix_web::middleware::from_fn; -use std::sync::Arc; -use tokio::sync::{oneshot, Mutex}; +use actix_web::web; use actix_web::web::resource; use actix_web::Resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer}; -use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; use crate::{ handlers::http::{ - self, cross_origin_config, ingest, llm, logstream, + self, ingest, llm, logstream, middleware::{DisAllowRootUser, RouteExt}, oidc, role, MAX_EVENT_PAYLOAD_SIZE, }, @@ -62,133 +56,13 @@ use crate::{ // use super::generate; use super::generate; -use super::ssl_acceptor::get_ssl_acceptor; use super::OpenIdClient; use super::ParseableServer; -#[derive(Default)] + pub struct Server; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for Server { - async fn start( - &self, - prometheus: PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - None => None, - }; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(60); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); - self.initialize().await?; - Ok(()) - } - - fn validate(&self) -> anyhow::Result<()> { - Ok(()) - } -} - -impl Server { fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { // there might be a bug in the configure routes method config @@ -215,6 +89,26 @@ impl Server { .service(Self::get_generated()); } + /// implementation of init should just invoke a call to initialize + async fn init(&self) -> anyhow::Result<()> { + self.validate()?; + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + rbac::map::init(&metadata); + metadata.set_global(); + self.initialize().await?; + Ok(()) + } + + fn validate(&self) -> anyhow::Result<()> { + Ok(()) + } +} + +impl Server { // get the trino factory pub fn get_trino_factory() -> Resource { web::resource("/trinoquery") From 20e67ee1f99f7d51c651b92831c567b8cfef7e8f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 2 Nov 2024 23:55:19 +0530 Subject: [PATCH 05/16] style: use box pointers --- src/main.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index b689533e6..3b4397772 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,8 +16,6 @@ * */ -use std::sync::Arc; - use parseable::{ option::{Mode, CONFIG}, IngestServer, ParseableServer, QueryServer, Server, @@ -28,12 +26,10 @@ async fn main() -> anyhow::Result<()> { env_logger::init(); // these are empty ptrs so mem footprint should be minimal - let server: Arc = match CONFIG.parseable.mode { - Mode::Query => Arc::new(QueryServer), - - Mode::Ingest => Arc::new(IngestServer), - - Mode::All => Arc::new(Server), + let server: Box = match CONFIG.parseable.mode { + Mode::Query => Box::new(QueryServer), + Mode::Ingest => Box::new(IngestServer), + Mode::All => Box::new(Server), }; server.init().await?; From 86b574a2d4ca630004cd3e3e5763c15bd3d89194 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 2 Nov 2024 22:02:55 +0530 Subject: [PATCH 06/16] doc: explain `None` --- src/handlers/http/modal/ingest_server.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 2cc0e4631..b61bbed59 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -377,8 +377,7 @@ impl IngestServer { // set the ingestor metadata self.set_ingestor_metadata().await?; - // TODO: figure out why we don't need this - // > we dont need oidc client here its just here to satisfy the trait + // Ingestors shouldn't have to deal with OpenId auth flow let app = self.start(prometheus, None); tokio::pin!(app); From bbc48ff4adb797e8311db21e6030b02dcd3403ee Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 5 Nov 2024 10:31:30 +0530 Subject: [PATCH 07/16] refactor: avoid unwrap --- src/storage/store_metadata.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index d49cbd87f..ec8673b59 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -106,13 +106,10 @@ pub async fn resolve_parseable_metadata( parseable_metadata: &Option, ) -> Result { let staging_metadata = get_staging_metadata()?; - let mut remote_metadata: Option = None; - if parseable_metadata.is_some() { - remote_metadata = Some( - serde_json::from_slice(parseable_metadata.as_ref().unwrap()) - .expect("parseable config is valid json"), - ); - } + let remote_metadata = parseable_metadata + .as_ref() + .map(|meta| serde_json::from_slice(&meta).expect("parseable config is valid json")); + // Env Change needs to be updated let check = determine_environment(staging_metadata, remote_metadata); // flags for if metadata needs to be synced From 8043e1b82f9dbee537a3635edd89af4f5c49ebcc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 6 Nov 2024 12:39:08 +0530 Subject: [PATCH 08/16] doc: move comment --- src/handlers/http/modal/query_server.rs | 1 - src/handlers/http/modal/server.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 777d7fb4c..a86f0cafa 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -48,7 +48,6 @@ impl ParseableServer for QueryServer { config .service( web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body .service(Server::get_query_factory()) .service(Server::get_trino_factory()) .service(Server::get_cache_webscope()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d5e716a90..01f9f9c37 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -68,7 +68,6 @@ impl ParseableServer for Server { config .service( web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body .service(Self::get_query_factory()) .service(Self::get_trino_factory()) .service(Self::get_cache_webscope()) @@ -186,6 +185,7 @@ impl Server { } // get the query factory + // POST "/query" ==> Get results of the SQL query passed in request body pub fn get_query_factory() -> Resource { web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) } From 9d6ea968fb11ab2ae230e4a92d88e96bb5ef1db5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 6 Nov 2024 12:45:12 +0530 Subject: [PATCH 09/16] refactor: merge init and initialize --- src/handlers/http/modal/ingest_server.rs | 117 ++++++++++--------- src/handlers/http/modal/query_server.rs | 136 +++++++++++----------- src/handlers/http/modal/server.rs | 138 +++++++++++------------ 3 files changed, 190 insertions(+), 201 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index b61bbed59..64f340991 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -85,7 +85,7 @@ impl ParseableServer for IngestServer { .service(Server::get_ingest_otel_factory()); } - /// implement the init method will just invoke the initialize method + /// configure the server and start an instance to ingest data async fn init(&self) -> anyhow::Result<()> { self.validate()?; @@ -99,7 +99,62 @@ impl ParseableServer for IngestServer { rbac::map::init(&metadata); // set the info in the global metadata metadata.set_global(); - self.initialize().await + + // ! Undefined and Untested behaviour + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + + // set the ingestor metadata + self.set_ingestor_metadata().await?; + + // Ingestors shouldn't have to deal with OpenId auth flow + let app = self.start(prometheus, None); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } } fn validate(&self) -> anyhow::Result<()> { @@ -353,62 +408,4 @@ impl IngestServer { Ok(()) } - - async fn initialize(&self) -> anyhow::Result<()> { - // ! Undefined and Untested behaviour - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - tokio::spawn(airplane::server()); - - // set the ingestor metadata - self.set_ingestor_metadata().await?; - - // Ingestors shouldn't have to deal with OpenId auth flow - let app = self.start(prometheus, None); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index a86f0cafa..5d2e24836 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -67,7 +67,7 @@ impl ParseableServer for QueryServer { .service(Server::get_generated()); } - /// implementation of init should just invoke a call to initialize + /// initialize the server, run migrations as needed and start an instance async fn init(&self) -> anyhow::Result<()> { self.validate()?; migration::run_file_migration(&CONFIG).await?; @@ -79,7 +79,71 @@ impl ParseableServer for QueryServer { rbac::map::init(&metadata); // keep metadata info in mem metadata.set_global(); - self.initialize().await + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + //create internal stream at server start + create_internal_stream_if_not_exists().await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); + + // all internal data structures populated now. + // start the analytics scheduler if enabled + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + if matches!(init_cluster_metrics_schedular(), Ok(())) { + log::info!("Cluster metrics scheduler started successfully"); + } + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.put_internal_stream_hot_tier().await?; + hot_tier_manager.download_from_s3()?; + }; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining localsync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } } fn validate(&self) -> anyhow::Result<()> { @@ -332,72 +396,4 @@ impl QueryServer { ), ) } - - /// initialize the server, run migrations as needed and start the server - async fn initialize(&self) -> anyhow::Result<()> { - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - //create internal stream at server start - create_internal_stream_if_not_exists().await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); - - // all internal data structures populated now. - // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - if matches!(init_cluster_metrics_schedular(), Ok(())) { - log::info!("Cluster metrics scheduler started successfully"); - } - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.put_internal_stream_hot_tier().await?; - hot_tier_manager.download_from_s3()?; - }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - tokio::spawn(airplane::server()); - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining localsync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 01f9f9c37..186e22c2a 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -88,7 +88,7 @@ impl ParseableServer for Server { .service(Self::get_generated()); } - /// implementation of init should just invoke a call to initialize + /// configure the server and start an instance of the single server setup async fn init(&self) -> anyhow::Result<()> { self.validate()?; migration::run_file_migration(&CONFIG).await?; @@ -98,8 +98,72 @@ impl ParseableServer for Server { banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); - self.initialize().await?; - Ok(()) + + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + + storage::retention::load_retention_from_global(); + + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.download_from_s3()?; + }; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); + + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } } fn validate(&self) -> anyhow::Result<()> { @@ -485,72 +549,4 @@ impl Server { pub fn get_generated() -> ResourceFiles { ResourceFiles::new("/", generate()).resolve_not_found_to_root() } - - async fn initialize(&self) -> anyhow::Result<()> { - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - - storage::retention::load_retention_from_global(); - - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.download_from_s3()?; - }; - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - tokio::spawn(handlers::livetail::server()); - tokio::spawn(handlers::airplane::server()); - - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } From 50ab1cb23618bafaa5a45b78b4bfc2015cc79e0b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 6 Nov 2024 12:57:53 +0530 Subject: [PATCH 10/16] refactor: separate out metadata loading stage --- src/handlers/http/modal/ingest_server.rs | 34 ++++++++--------------- src/handlers/http/modal/mod.rs | 11 +++++--- src/handlers/http/modal/query_server.rs | 35 ++++++++++-------------- src/handlers/http/modal/server.rs | 20 +++++--------- src/lib.rs | 6 ++-- src/main.rs | 12 +++++++- src/storage/store_metadata.rs | 2 +- 7 files changed, 56 insertions(+), 64 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 64f340991..2bc6db145 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -24,7 +24,6 @@ use super::IngestorMetadata; use super::OpenIdClient; use super::ParseableServer; use crate::analytics; -use crate::banner; use crate::handlers::airplane; use crate::handlers::http::ingest; use crate::handlers::http::logstream; @@ -35,9 +34,7 @@ use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; use crate::migration::metadata_migration::migrate_ingester_metadata; -use crate::rbac; use crate::rbac::role::Action; -use crate::storage; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::object_storage::parseable_json_path; use crate::storage::staging; @@ -85,21 +82,25 @@ impl ParseableServer for IngestServer { .service(Server::get_ingest_otel_factory()); } - /// configure the server and start an instance to ingest data - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; + async fn load_metadata(&self) -> anyhow::Result> { + // parseable can't use local storage for persistence when running a distributed setup + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::Error::msg( + // Error Message can be better + "Ingest Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + )); + } // check for querier state. Is it there, or was it there in the past let parseable_json = self.check_querier_state().await?; // to get the .parseable.json file in staging self.validate_credentials().await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - // set the info in the global metadata - metadata.set_global(); + Ok(parseable_json) + } + /// configure the server and start an instance to ingest data + async fn init(&self) -> anyhow::Result<()> { // ! Undefined and Untested behaviour if let Some(cache_manager) = LocalCacheManager::global() { cache_manager @@ -156,17 +157,6 @@ impl ParseableServer for IngestServer { }; } } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::Error::msg( - // Error Message can be better - "Ingest Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); - } - - Ok(()) - } } impl IngestServer { diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 6ff7f71f5..57d5313f2 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -33,6 +33,7 @@ use actix_web::HttpServer; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use base64::Engine; +use bytes::Bytes; use openid::Discovered; use serde::Deserialize; use serde::Serialize; @@ -60,6 +61,12 @@ pub trait ParseableServer { where Self: Sized; + /// load metadata/configuration from persistence for previous sessions of parseable + async fn load_metadata(&self) -> anyhow::Result>; + + /// code that describes starting and setup procedures for each type of server + async fn init(&self) -> anyhow::Result<()>; + /// configure the server async fn start( &self, @@ -165,10 +172,6 @@ pub trait ParseableServer { Ok(()) } - - async fn init(&self) -> anyhow::Result<()>; - - fn validate(&self) -> anyhow::Result<()>; } #[derive(Serialize, Debug, Deserialize, Default, Clone, Eq, PartialEq)] diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 5d2e24836..c0a9b4dbf 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -28,10 +28,11 @@ use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use crate::{analytics, banner, metrics, migration, rbac, storage}; +use crate::{analytics, metrics, migration, storage}; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; use async_trait::async_trait; +use bytes::Bytes; use crate::{option::CONFIG, ParseableServer}; @@ -67,19 +68,23 @@ impl ParseableServer for QueryServer { .service(Server::get_generated()); } - /// initialize the server, run migrations as needed and start an instance - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; + async fn load_metadata(&self) -> anyhow::Result> { + // parseable can't use local storage for persistence when running a distributed setup + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::anyhow!( + "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + )); + } + migration::run_file_migration(&CONFIG).await?; let parseable_json = CONFIG.validate_storage().await?; migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - // initialize the rbac map - rbac::map::init(&metadata); - // keep metadata info in mem - metadata.set_global(); + Ok(parseable_json) + } + + /// initialize the server, run migrations as needed and start an instance + async fn init(&self) -> anyhow::Result<()> { let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); @@ -145,16 +150,6 @@ impl ParseableServer for QueryServer { }; } } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::anyhow!( - "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); - } - - Ok(()) - } } impl QueryServer { diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 186e22c2a..48e931619 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -17,7 +17,6 @@ */ use crate::analytics; -use crate::banner; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; @@ -31,7 +30,6 @@ use crate::hottier::HotTierManager; use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; -use crate::rbac; use crate::storage; use crate::sync; use crate::users::dashboards::DASHBOARDS; @@ -43,6 +41,7 @@ use actix_web::Resource; use actix_web::Scope; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; +use bytes::Bytes; use crate::{ handlers::http::{ @@ -88,17 +87,16 @@ impl ParseableServer for Server { .service(Self::get_generated()); } - /// configure the server and start an instance of the single server setup - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; + async fn load_metadata(&self) -> anyhow::Result> { migration::run_file_migration(&CONFIG).await?; let parseable_json = CONFIG.validate_storage().await?; migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); + Ok(parseable_json) + } + + // configure the server and start an instance of the single server setup + async fn init(&self) -> anyhow::Result<()> { if let Some(cache_manager) = LocalCacheManager::global() { cache_manager .validate(CONFIG.parseable.local_cache_size) @@ -165,10 +163,6 @@ impl ParseableServer for Server { }; } } - - fn validate(&self) -> anyhow::Result<()> { - Ok(()) - } } impl Server { diff --git a/src/lib.rs b/src/lib.rs index 857542351..d84aca6df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ mod about; mod alerts; mod analytics; -mod banner; +pub mod banner; mod catalog; mod cli; mod event; @@ -34,11 +34,11 @@ mod oidc; pub mod option; mod query; mod querycache; -mod rbac; +pub mod rbac; mod response; mod static_schema; mod stats; -mod storage; +pub mod storage; mod sync; mod users; mod utils; diff --git a/src/main.rs b/src/main.rs index 3b4397772..9399c52e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,8 +17,9 @@ */ use parseable::{ + banner, option::{Mode, CONFIG}, - IngestServer, ParseableServer, QueryServer, Server, + rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; #[actix_web::main] @@ -32,6 +33,15 @@ async fn main() -> anyhow::Result<()> { Mode::All => Box::new(Server), }; + // load metadata from persistence + let parseable_json = server.load_metadata().await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + // initialize the rbac map + rbac::map::init(&metadata); + // keep metadata info in mem + metadata.set_global(); + server.init().await?; Ok(()) diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index ec8673b59..dac0d26be 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -108,7 +108,7 @@ pub async fn resolve_parseable_metadata( let staging_metadata = get_staging_metadata()?; let remote_metadata = parseable_metadata .as_ref() - .map(|meta| serde_json::from_slice(&meta).expect("parseable config is valid json")); + .map(|meta| serde_json::from_slice(meta).expect("parseable config is valid json")); // Env Change needs to be updated let check = determine_environment(staging_metadata, remote_metadata); From d1d901a288fd9c788d21b4ae49f0fe63501d9727 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 6 Nov 2024 14:39:53 +0530 Subject: [PATCH 11/16] doc: improve error message --- src/handlers/http/modal/ingest_server.rs | 3 +-- src/handlers/http/modal/query_server.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 2bc6db145..0e1226b83 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -86,8 +86,7 @@ impl ParseableServer for IngestServer { // parseable can't use local storage for persistence when running a distributed setup if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::Error::msg( - // Error Message can be better - "Ingest Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", )); } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c0a9b4dbf..ffde5fb11 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -72,7 +72,7 @@ impl ParseableServer for QueryServer { // parseable can't use local storage for persistence when running a distributed setup if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::anyhow!( - "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", )); } From fc6e2a2f1bbc9234d3a7562a7df0820c1b548745 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 26 Nov 2024 14:02:48 +0530 Subject: [PATCH 12/16] refactor: `ObjectStorage: Send` --- src/catalog/mod.rs | 8 ++++---- src/handlers/http/modal/ingest/ingestor_logstream.rs | 2 +- src/hottier.rs | 2 +- src/migration/mod.rs | 6 ++---- src/option.rs | 2 +- src/query/listing_table_builder.rs | 2 +- src/query/stream_schema_provider.rs | 4 ++-- src/stats.rs | 2 +- src/storage/azure_blob.rs | 2 +- src/storage/localfs.rs | 2 +- src/storage/object_storage.rs | 6 +++--- src/storage/s3.rs | 2 +- 12 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 19a557647..5c502ac89 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -101,7 +101,7 @@ fn get_file_bounds( } pub async fn update_snapshot( - storage: Arc, + storage: Arc, stream_name: &str, change: manifest::File, ) -> Result<(), ObjectStorageError> { @@ -239,7 +239,7 @@ pub async fn update_snapshot( async fn create_manifest( lower_bound: DateTime, change: manifest::File, - storage: Arc, + storage: Arc, stream_name: &str, update_snapshot: bool, mut meta: ObjectStoreFormat, @@ -318,7 +318,7 @@ async fn create_manifest( } pub async fn remove_manifest_from_snapshot( - storage: Arc, + storage: Arc, stream_name: &str, dates: Vec, ) -> Result, ObjectStorageError> { @@ -343,7 +343,7 @@ pub async fn remove_manifest_from_snapshot( } pub async fn get_first_event( - storage: Arc, + storage: Arc, stream_name: &str, dates: Vec, ) -> Result, ObjectStorageError> { diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 88ad68765..16711ebbb 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -115,7 +115,7 @@ pub async fn put_enable_cache( } metadata::STREAM_INFO .upsert_stream_info( - &*storage, + storage.as_ref(), LogStream { name: stream_name.clone().to_owned(), }, diff --git a/src/hottier.rs b/src/hottier.rs index 0528a1228..b6f29f609 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -289,7 +289,7 @@ impl HotTierManager { stream: &str, manifest_files_to_download: &mut BTreeMap>, parquet_file_size: &mut u64, - object_store: Arc, + object_store: Arc, ) -> Result<(), HotTierError> { if manifest_files_to_download.is_empty() { return Ok(()); diff --git a/src/migration/mod.rs b/src/migration/mod.rs index c57eafcff..53ab5a511 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -357,7 +357,7 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { } async fn run_meta_file_migration( - object_store: &Arc, + object_store: &Arc, old_meta_file_path: RelativePathBuf, ) -> anyhow::Result<()> { // get the list of all meta files @@ -388,9 +388,7 @@ async fn run_meta_file_migration( Ok(()) } -async fn run_stream_files_migration( - object_store: &Arc, -) -> anyhow::Result<()> { +async fn run_stream_files_migration(object_store: &Arc) -> anyhow::Result<()> { let streams = object_store .list_old_streams() .await? diff --git a/src/option.rs b/src/option.rs index 73b701c6e..be4a8cd08 100644 --- a/src/option.rs +++ b/src/option.rs @@ -39,7 +39,7 @@ pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] pub struct Config { pub parseable: Cli, - storage: Arc, + storage: Arc, pub storage_name: &'static str, } diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 71a61998c..685a34a4b 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -56,7 +56,7 @@ impl ListingTableBuilder { pub async fn populate_via_listing( self, - storage: Arc, + storage: Arc, client: Arc, time_filters: &[PartialTimeFilter], ) -> Result { diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index d5ccea485..f27cb6998 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -75,7 +75,7 @@ use crate::catalog::Snapshot as CatalogSnapshot; // schema provider for stream based on global data pub struct GlobalSchemaProvider { - pub storage: Arc, + pub storage: Arc, } #[async_trait::async_trait] @@ -614,7 +614,7 @@ async fn get_hottier_exectuion_plan( #[allow(clippy::too_many_arguments)] async fn legacy_listing_table( stream: String, - glob_storage: Arc, + glob_storage: Arc, object_store: Arc, time_filters: &[PartialTimeFilter], schema: Arc, diff --git a/src/stats.rs b/src/stats.rs index b7845ecc9..52ffc3b24 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -100,7 +100,7 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option, + storage: Arc, stream_name: &str, meta: ObjectStoreFormat, dates: Vec, diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index c5491be6f..d50e2d901 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -163,7 +163,7 @@ impl ObjectStorageProvider for AzureBlobConfig { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index a84247f0b..b3d3e09cd 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -67,7 +67,7 @@ impl ObjectStorageProvider for FSConfig { RuntimeConfig::new() } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { Arc::new(LocalFS::new(self.root.clone())) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index ff2a56953..78f51685d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -58,15 +58,15 @@ use std::{ time::{Duration, Instant}, }; -pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { +pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeConfig; - fn get_object_store(&self) -> Arc; + fn get_object_store(&self) -> Arc; fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); } #[async_trait] -pub trait ObjectStorage: Sync + 'static { +pub trait ObjectStorage: Send + Sync + 'static { async fn get_object(&self, path: &RelativePath) -> Result; // TODO: make the filter function optional as we may want to get all objects async fn get_objects( diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 6a546a148..89f5b361d 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -289,7 +289,7 @@ impl ObjectStorageProvider for S3Config { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit From 095f720e334befd98cd9f9208200834256a6bb59 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 26 Nov 2024 18:41:16 +0530 Subject: [PATCH 13/16] rm unnecessary trait constraint --- src/option.rs | 2 +- src/query/mod.rs | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/option.rs b/src/option.rs index be4a8cd08..fb9f3c75c 100644 --- a/src/option.rs +++ b/src/option.rs @@ -170,7 +170,7 @@ Cloud Native, log analytics platform for modern applications."#, Err(ObjectStorageError::Custom(format!("Could not start the server because bucket '{}' contains stale data, please use an empty bucket and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))) } - pub fn storage(&self) -> Arc { + pub fn storage(&self) -> Arc { self.storage.clone() } diff --git a/src/query/mod.rs b/src/query/mod.rs index 24dc39d58..b41a066f8 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -61,9 +61,7 @@ pub struct Query { impl Query { // create session context for this query - pub fn create_session_context( - storage: Arc, - ) -> SessionContext { + pub fn create_session_context(storage: Arc) -> SessionContext { let runtime_config = storage .get_datafusion_runtime() .with_disk_manager(DiskManagerConfig::NewOs); From 24caa3463a9f648a811a6b880b2e6c930f85181a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 27 Nov 2024 04:02:27 +0530 Subject: [PATCH 14/16] make pub --- src/lib.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d84aca6df..140c32dcc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,18 +18,18 @@ mod about; mod alerts; -mod analytics; +pub mod analytics; pub mod banner; mod catalog; mod cli; mod event; -mod handlers; -mod hottier; +pub mod handlers; +pub mod hottier; mod livetail; -mod localcache; +pub mod localcache; mod metadata; -mod metrics; -mod migration; +pub mod metrics; +pub mod migration; mod oidc; pub mod option; mod query; @@ -39,8 +39,8 @@ mod response; mod static_schema; mod stats; pub mod storage; -mod sync; -mod users; +pub mod sync; +pub mod users; mod utils; mod validator; From 7fad4e9997b4afedf9318d738b07a6a33fc551fa Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 27 Nov 2024 04:16:58 +0530 Subject: [PATCH 15/16] refactor: check env once --- src/about.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/about.rs b/src/about.rs index 5a1d20e03..1897e1299 100644 --- a/src/about.rs +++ b/src/about.rs @@ -24,24 +24,26 @@ use crate::utils::update::{self, LatestRelease}; use chrono::Duration; use chrono_humanize::{Accuracy, Tense}; use crossterm::style::Stylize; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; use std::env; use std::path::Path; use sysinfo::System; use ulid::Ulid; + // Expose some static variables for internal usage pub static LATEST_RELEASE: OnceCell> = OnceCell::new(); -static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST"; - -fn is_docker() -> bool { - Path::new("/.dockerenv").exists() -} +static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST"; fn is_k8s() -> bool { env::var(K8S_ENV_TO_CHECK).is_ok() } -pub fn platform() -> &'static str { +static DOCKERENV_FILE: &str = "/.dockerenv"; +fn is_docker() -> bool { + Path::new(DOCKERENV_FILE).exists() +} + +static PLATFORM: Lazy<&'static str> = Lazy::new(|| { if is_k8s() { "Kubernetes" } else if is_docker() { @@ -49,6 +51,10 @@ pub fn platform() -> &'static str { } else { "Native" } +}); + +pub fn platform() -> &'static str { + PLATFORM.as_ref() } pub fn set_latest_release(latest_release: Option) { From cc5f387969ade91b7a2a9a6cd8aa721ff94d3dda Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 27 Nov 2024 11:15:58 +0530 Subject: [PATCH 16/16] make more pub --- src/handlers/http/mod.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 2a2279800..6ccdaf3cc 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -25,28 +25,28 @@ use crate::option::CONFIG; use self::{cluster::get_ingestor_info, query::Query}; -pub(crate) mod about; -mod cache; +pub mod about; +pub mod cache; pub mod cluster; -pub(crate) mod health_check; -pub(crate) mod ingest; +pub mod health_check; +pub mod ingest; mod kinesis; -pub(crate) mod llm; -pub(crate) mod logstream; -pub(crate) mod middleware; +pub mod llm; +pub mod logstream; +pub mod middleware; pub mod modal; -pub(crate) mod oidc; +pub mod oidc; mod otel; -pub(crate) mod query; -pub(crate) mod rbac; -pub(crate) mod role; -pub(crate) mod trino; +pub mod query; +pub mod rbac; +pub mod role; +pub mod trino; pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; -pub(crate) fn base_path() -> String { +pub fn base_path() -> String { format!("/{API_BASE_PATH}/{API_VERSION}") }