Skip to content

Commit 1a3bffa

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main'
2 parents 6d8d027 + 1923095 commit 1a3bffa

File tree

8 files changed

+142
-106
lines changed

8 files changed

+142
-106
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "parseable"
3-
version = "1.7.1"
3+
version = "1.7.2"
44
authors = ["Parseable Team <[email protected]>"]
55
edition = "2021"
66
rust-version = "1.83.0"
@@ -99,9 +99,6 @@ sysinfo = "0.31.4"
9999
thread-priority = "1.0.0"
100100
uptime_lib = "0.3.0"
101101

102-
# Kafka
103-
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
104-
105102
# Utility Libraries
106103
anyhow = { version = "1.0", features = ["backtrace"] }
107104
bytes = "1.4"
@@ -142,8 +139,8 @@ rstest = "0.23.0"
142139
arrow = "53.0.0"
143140

144141
[package.metadata.parseable_ui]
145-
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.15/build.zip"
146-
assets-sha1 = "1ef784c0a26d6ec9facf5ed59c6c391612525b72"
142+
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.16/build.zip"
143+
assets-sha1 = "cb9af17fc1af07e590e839fc0ef4db18c323fc48"
147144

148145
[features]
149146
debug = []
@@ -156,3 +153,6 @@ codegen-units = 1
156153
# adding rdkafka here because, for unsupported platforms, cargo skips other deps which come after this
157154
[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
158155
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
156+
157+
[target.'cfg(all(target_os = "macos", target_arch = "aarch64"))'.dependencies]
158+
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }

src/cli.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,17 @@ use crate::{
2626
option::{validation, Compression, Mode}, storage::{AzureBlobConfig, FSConfig, S3Config},
2727
};
2828

29-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
29+
#[cfg(any(
30+
all(target_os = "linux", target_arch = "x86_64"),
31+
all(target_os = "macos", target_arch = "aarch64")
32+
))]
3033
use crate::kafka::SslProtocol as KafkaSslProtocol;
3134

35+
#[cfg(not(any(
36+
all(target_os = "linux", target_arch = "x86_64"),
37+
all(target_os = "macos", target_arch = "aarch64")
38+
)))]
39+
use std::string::String as KafkaSslProtocol;
3240

3341
/// Default username and password for Parseable server, used by default for local mode.
3442
/// NOTE: obviously not recommended for production
@@ -317,23 +325,38 @@ pub struct Options {
317325
oidc_issuer: Option<Url>,
318326

319327
// Kafka configuration (conditionally compiled)
320-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
328+
#[cfg(any(
329+
all(target_os = "linux", target_arch = "x86_64"),
330+
all(target_os = "macos", target_arch = "aarch64")
331+
))]
321332
#[arg(long, env = "P_KAFKA_TOPICS", help = "Kafka topics to subscribe to")]
322333
pub kafka_topics: Option<String>,
323334

324-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
335+
#[cfg(any(
336+
all(target_os = "linux", target_arch = "x86_64"),
337+
all(target_os = "macos", target_arch = "aarch64")
338+
))]
325339
#[arg(long, env = "P_KAFKA_HOST", help = "Address and port for Kafka server")]
326340
pub kafka_host: Option<String>,
327341

328-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
342+
#[cfg(any(
343+
all(target_os = "linux", target_arch = "x86_64"),
344+
all(target_os = "macos", target_arch = "aarch64")
345+
))]
329346
#[arg(long, env = "P_KAFKA_GROUP", help = "Kafka group")]
330347
pub kafka_group: Option<String>,
331348

332-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
349+
#[cfg(any(
350+
all(target_os = "linux", target_arch = "x86_64"),
351+
all(target_os = "macos", target_arch = "aarch64")
352+
))]
333353
#[arg(long, env = "P_KAFKA_CLIENT_ID", help = "Kafka client id")]
334354
pub kafka_client_id: Option<String>,
335355

336-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
356+
#[cfg(any(
357+
all(target_os = "linux", target_arch = "x86_64"),
358+
all(target_os = "macos", target_arch = "aarch64")
359+
))]
337360
#[arg(
338361
long,
339362
env = "P_KAFKA_SECURITY_PROTOCOL",
@@ -342,7 +365,10 @@ pub struct Options {
342365
)]
343366
pub kafka_security_protocol: Option<KafkaSslProtocol>,
344367

345-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
368+
#[cfg(any(
369+
all(target_os = "linux", target_arch = "x86_64"),
370+
all(target_os = "macos", target_arch = "aarch64")
371+
))]
346372
#[arg(long, env = "P_KAFKA_PARTITIONS", help = "Kafka partitions")]
347373
pub kafka_partitions: Option<String>,
348374

src/handlers/http/modal/ingest_server.rs

Lines changed: 80 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::{handlers::http::base_path, option::CONFIG};
4545
use actix_web::web;
4646
use actix_web::web::resource;
4747
use actix_web::Scope;
48-
use anyhow::anyhow;
4948
use async_trait::async_trait;
5049
use base64::Engine;
5150
use bytes::Bytes;
@@ -85,14 +84,14 @@ impl ParseableServer for IngestServer {
8584
// parseable can't use local storage for persistence when running a distributed setup
8685
if CONFIG.get_storage_mode_string() == "Local drive" {
8786
return Err(anyhow::Error::msg(
88-
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
89-
));
87+
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
88+
));
9089
}
9190

9291
// check for querier state. Is it there, or was it there in the past
93-
let parseable_json = self.check_querier_state().await?;
92+
let parseable_json = check_querier_state().await?;
9493
// to get the .parseable.json file in staging
95-
self.validate_credentials().await?;
94+
validate_credentials().await?;
9695

9796
Ok(parseable_json)
9897
}
@@ -112,7 +111,7 @@ impl ParseableServer for IngestServer {
112111
tokio::spawn(airplane::server());
113112

114113
// set the ingestor metadata
115-
self.set_ingestor_metadata().await?;
114+
set_ingestor_metadata().await?;
116115

117116
// Ingestors shouldn't have to deal with OpenId auth flow
118117
let app = self.start(prometheus, None);
@@ -278,96 +277,92 @@ impl IngestServer {
278277
),
279278
)
280279
}
280+
}
281+
282+
// create the ingestor metadata and put the .ingestor.json file in the object store
283+
pub async fn set_ingestor_metadata() -> anyhow::Result<()> {
284+
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
285+
let store = CONFIG.storage().get_object_store();
286+
287+
// find the meta file in staging if not generate new metadata
288+
let resource = INGESTOR_META.clone();
289+
// use the id that was generated/found in the staging and
290+
// generate the path for the object store
291+
let path = ingestor_metadata_path(None);
292+
293+
// we are considering that we can always get from object store
294+
if let Some(mut store_data) = storage_ingestor_metadata {
295+
if store_data.domain_name != INGESTOR_META.domain_name {
296+
store_data
297+
.domain_name
298+
.clone_from(&INGESTOR_META.domain_name);
299+
store_data.port.clone_from(&INGESTOR_META.port);
281300

282-
// create the ingestor metadata and put the .ingestor.json file in the object store
283-
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
284-
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
285-
let store = CONFIG.storage().get_object_store();
286-
287-
// find the meta file in staging if not generate new metadata
288-
let resource = INGESTOR_META.clone();
289-
// use the id that was generated/found in the staging and
290-
// generate the path for the object store
291-
let path = ingestor_metadata_path(None);
292-
293-
// we are considering that we can always get from object store
294-
if storage_ingestor_metadata.is_some() {
295-
let mut store_data = storage_ingestor_metadata.unwrap();
296-
297-
if store_data.domain_name != INGESTOR_META.domain_name {
298-
store_data
299-
.domain_name
300-
.clone_from(&INGESTOR_META.domain_name);
301-
store_data.port.clone_from(&INGESTOR_META.port);
302-
303-
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
304-
305-
// if pushing to object store fails propagate the error
306-
return store
307-
.put_object(&path, resource)
308-
.await
309-
.map_err(|err| anyhow!(err));
310-
}
311-
} else {
312-
let resource = Bytes::from(serde_json::to_vec(&resource)?);
301+
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
313302

303+
// if pushing to object store fails propagate the error
314304
store.put_object(&path, resource).await?;
315305
}
306+
} else {
307+
let resource = Bytes::from(serde_json::to_vec(&resource)?);
316308

317-
Ok(())
309+
store.put_object(&path, resource).await?;
318310
}
319311

320-
// check for querier state. Is it there, or was it there in the past
321-
// this should happen before the set the ingestor metadata
322-
async fn check_querier_state(&self) -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
323-
// how do we check for querier state?
324-
// based on the work flow of the system, the querier will always need to start first
325-
// i.e the querier will create the `.parseable.json` file
326-
327-
let store = CONFIG.storage().get_object_store();
328-
let path = parseable_json_path();
312+
Ok(())
313+
}
329314

330-
let parseable_json = store.get_object(&path).await;
331-
match parseable_json {
332-
Ok(_) => Ok(Some(parseable_json.unwrap())),
333-
Err(_) => Err(ObjectStorageError::Custom(
315+
// check for querier state. Is it there, or was it there in the past
316+
// this should happen before the set the ingestor metadata
317+
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
318+
// how do we check for querier state?
319+
// based on the work flow of the system, the querier will always need to start first
320+
// i.e the querier will create the `.parseable.json` file
321+
let parseable_json = CONFIG
322+
.storage()
323+
.get_object_store()
324+
.get_object(&parseable_json_path())
325+
.await
326+
.map_err(|_| {
327+
ObjectStorageError::Custom(
334328
"Query Server has not been started yet. Please start the querier server first."
335329
.to_string(),
336-
)),
337-
}
338-
}
339-
340-
async fn validate_credentials(&self) -> anyhow::Result<()> {
341-
// check if your creds match with others
342-
let store = CONFIG.storage().get_object_store();
343-
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
344-
let ingestor_metadata = store
345-
.get_objects(
346-
Some(&base_path),
347-
Box::new(|file_name| file_name.starts_with("ingestor")),
348330
)
349-
.await?;
350-
if !ingestor_metadata.is_empty() {
351-
let ingestor_metadata_value: Value =
352-
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
353-
let check = ingestor_metadata_value
354-
.as_object()
355-
.and_then(|meta| meta.get("token"))
356-
.and_then(|token| token.as_str())
357-
.unwrap();
358-
359-
let token = base64::prelude::BASE64_STANDARD.encode(format!(
360-
"{}:{}",
361-
CONFIG.options.username, CONFIG.options.password
362-
));
363-
364-
let token = format!("Basic {}", token);
365-
366-
if check != token {
367-
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
368-
}
369-
}
331+
})?;
332+
333+
Ok(Some(parseable_json))
334+
}
370335

371-
Ok(())
336+
async fn validate_credentials() -> anyhow::Result<()> {
337+
// check if your creds match with others
338+
let store = CONFIG.storage().get_object_store();
339+
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
340+
let ingestor_metadata = store
341+
.get_objects(
342+
Some(&base_path),
343+
Box::new(|file_name| file_name.starts_with("ingestor")),
344+
)
345+
.await?;
346+
if !ingestor_metadata.is_empty() {
347+
let ingestor_metadata_value: Value =
348+
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
349+
let check = ingestor_metadata_value
350+
.as_object()
351+
.and_then(|meta| meta.get("token"))
352+
.and_then(|token| token.as_str())
353+
.unwrap();
354+
355+
let token = base64::prelude::BASE64_STANDARD.encode(format!(
356+
"{}:{}",
357+
CONFIG.options.username, CONFIG.options.password
358+
));
359+
360+
let token = format!("Basic {}", token);
361+
362+
if check != token {
363+
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
364+
}
372365
}
366+
367+
Ok(())
373368
}

src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ pub mod correlation;
2727
mod event;
2828
pub mod handlers;
2929
pub mod hottier;
30-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
30+
#[cfg(any(
31+
all(target_os = "linux", target_arch = "x86_64"),
32+
all(target_os = "macos", target_arch = "aarch64")
33+
))]
3134
pub mod kafka;
3235
mod livetail;
3336
mod metadata;

src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use parseable::{
2323
};
2424
use tracing_subscriber::EnvFilter;
2525

26-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
26+
#[cfg(any(
27+
all(target_os = "linux", target_arch = "x86_64"),
28+
all(target_os = "macos", target_arch = "aarch64")
29+
))]
2730
use parseable::kafka;
2831

2932
#[actix_web::main]
@@ -49,7 +52,10 @@ async fn main() -> anyhow::Result<()> {
4952
// keep metadata info in mem
5053
metadata.set_global();
5154

52-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
55+
#[cfg(any(
56+
all(target_os = "linux", target_arch = "x86_64"),
57+
all(target_os = "macos", target_arch = "aarch64")
58+
))]
5359
// load kafka server
5460
if CONFIG.options.mode != Mode::Query {
5561
tokio::task::spawn(kafka::setup_integration());

src/option.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,10 @@ pub mod validation {
199199

200200
use human_size::{multiples, SpecificSize};
201201

202-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
202+
#[cfg(any(
203+
all(target_os = "linux", target_arch = "x86_64"),
204+
all(target_os = "macos", target_arch = "aarch64")
205+
))]
203206
use crate::kafka::SslProtocol;
204207

205208
use super::{Compression, Mode};
@@ -246,7 +249,10 @@ pub mod validation {
246249
url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string())
247250
}
248251

249-
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
252+
#[cfg(any(
253+
all(target_os = "linux", target_arch = "x86_64"),
254+
all(target_os = "macos", target_arch = "aarch64")
255+
))]
250256
pub fn kafka_security_protocol(s: &str) -> Result<SslProtocol, String> {
251257
match s {
252258
"plaintext" => Ok(SslProtocol::Plaintext),

0 commit comments

Comments
 (0)