From f2a0ec22d64e33c2fa38d2e3e4e510b776c34fd4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 10 Feb 2025 02:28:49 +0530 Subject: [PATCH] refactor: process ain't async --- src/connectors/kafka/processor.rs | 5 +---- src/event/mod.rs | 2 +- src/handlers/http/ingest.rs | 4 ++-- src/handlers/http/modal/utils/ingest_utils.rs | 3 +-- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index e06835121..b5b8f4fe4 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -103,10 +103,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {} records", len); - self.build_event_from_chunk(&records) - .await? - .process() - .await?; + self.build_event_from_chunk(&records).await?.process()?; debug!("Processed {} records", len); Ok(()) diff --git a/src/event/mod.rs b/src/event/mod.rs index 1178c7138..eaa324699 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -46,7 +46,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(self) -> Result<(), EventError> { + pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index b3da07761..39b9184e5 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -103,8 +103,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< custom_partition_values: HashMap::new(), stream_type: StreamType::Internal, } - .process() - .await?; + .process()?; + Ok(()) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 3a2b9c797..a3e9af22d 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -129,8 +129,7 @@ pub async fn push_logs( custom_partition_values, stream_type: StreamType::UserDefined, } - .process() - .await?; + .process()?; } Ok(()) }