Skip to content

Commit c72864f

Browse files
author
Devdutt Shenoi
committed
fix: kafka security protocol
1 parent e36a3e7 commit c72864f

File tree

2 files changed

+24
-11
lines changed

2 files changed

+24
-11
lines changed

src/cli.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -318,33 +318,32 @@ pub struct Options {
318318

319319
// Kafka configuration (conditionally compiled)
320320
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
321-
#[arg(
322-
long,env = "P_KAFKA_TOPICS", help = "Kafka topics to subscribe to")]
321+
#[arg(long, env = "P_KAFKA_TOPICS", help = "Kafka topics to subscribe to")]
323322
pub kafka_topics: Option<String>,
324323

325324
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
326-
#[arg(
327-
long,env = "P_KAFKA_HOST", help = "Address and port for Kafka server")]
325+
#[arg(long, env = "P_KAFKA_HOST", help = "Address and port for Kafka server")]
328326
pub kafka_host: Option<String>,
329327

330328
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
331-
#[arg(
332-
long,env = "P_KAFKA_GROUP", help = "Kafka group")]
329+
#[arg(long, env = "P_KAFKA_GROUP", help = "Kafka group")]
333330
pub kafka_group: Option<String>,
334331

335332
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
336-
#[arg(
337-
long,env = "P_KAFKA_CLIENT_ID", help = "Kafka client id")]
333+
#[arg(long, env = "P_KAFKA_CLIENT_ID", help = "Kafka client id")]
338334
pub kafka_client_id: Option<String>,
339335

340336
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
341337
#[arg(
342-
long,env = "P_KAFKA_SECURITY_PROTOCOL", help = "Kafka security protocol")]
338+
long,
339+
env = "P_KAFKA_SECURITY_PROTOCOL",
340+
value_parser = validation::kafka_security_protocol,
341+
help = "Kafka security protocol"
342+
)]
343343
pub kafka_security_protocol: Option<KafkaSslProtocol>,
344344

345345
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
346-
#[arg(
347-
long,env = "P_KAFKA_PARTITIONS", help = "Kafka partitions")]
346+
#[arg(long, env = "P_KAFKA_PARTITIONS", help = "Kafka partitions")]
348347
pub kafka_partitions: Option<String>,
349348

350349
// Audit logging

src/option.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ use serde::{Deserialize, Serialize};
2828
use std::path::PathBuf;
2929
use std::sync::Arc;
3030

31+
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
32+
use crate::kafka::KafkaSslProtocol;
33+
3134
pub const JOIN_COMMUNITY: &str =
3235
"Join us on Parseable Slack community for questions : https://logg.ing/community";
3336
pub static CONFIG: Lazy<Arc<Config>> = Lazy::new(|| Arc::new(Config::new()));
@@ -243,6 +246,17 @@ pub mod validation {
243246
url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string())
244247
}
245248

249+
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
250+
pub fn kafka_security_protocol(s: &str) -> Result<KafkaSslProtocol, String> {
251+
match s {
252+
"plaintext" => Ok(KafkaSslProtocol::Plaintext),
253+
"ssl" => Ok(KafkaSslProtocol::Ssl),
254+
"sasl_plaintext" => Ok(KafkaSslProtocol::SaslPlaintext),
255+
"sasl_ssl" => Ok(KafkaSslProtocol::SaslSsl),
256+
_ => Err("Invalid Kafka Security Protocol provided".to_string()),
257+
}
258+
}
259+
246260
pub fn mode(s: &str) -> Result<Mode, String> {
247261
match s {
248262
"query" => Ok(Mode::Query),

0 commit comments

Comments
 (0)