diff --git a/src/event/writer/file_writer.rs b/src/event/writer/file_writer.rs index 0b990421d..fed5afa7c 100644 --- a/src/event/writer/file_writer.rs +++ b/src/event/writer/file_writer.rs @@ -29,8 +29,6 @@ use crate::storage::staging::StorageDir; use chrono::NaiveDateTime; pub struct ArrowWriter { - #[allow(dead_code)] - pub file_path: PathBuf, pub writer: StreamWriter, } @@ -54,20 +52,14 @@ impl FileWriter { // entry is not present thus we create it None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file( + let (_, writer) = init_new_stream_writer_file( stream_name, schema_key, record, parsed_timestamp, custom_partition_values, )?; - self.insert( - schema_key.to_owned(), - ArrowWriter { - file_path: path, - writer, - }, - ); + self.insert(schema_key.to_owned(), ArrowWriter { writer }); } }; diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index 0e64cd283..b41582d70 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -16,17 +16,10 @@ * */ -use crate::{ - handlers::http::{ - base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata, - }, - HTTP_CLIENT, -}; +use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT}; use actix_web::http::header; use chrono::{DateTime, Utc}; -use http::StatusCode; use itertools::Itertools; -use reqwest::Response; use serde::{Deserialize, Serialize}; use tracing::error; use url::Url; @@ -247,65 +240,6 @@ pub async fn check_liveness(domain_name: &str) -> bool { req.is_ok() } -/// send a request to the ingestor to fetch its stats -/// dead for now -#[allow(dead_code)] -pub async fn send_stats_request( - url: &str, - ingestor: IngestorMetadata, -) -> Result, StreamError> { - if !check_liveness(&ingestor.domain_name).await { - return Ok(None); - } - - let res = HTTP_CLIENT - .get(url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingestor.token) - .send() - .await - .map_err(|err| { - error!( - "Fatal: failed to fetch stats from ingestor: {}\n Error: {:?}", - ingestor.domain_name, err - ); - - StreamError::Network(err) - })?; - - if !res.status().is_success() { - error!( - "failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, res - ); - return Err(StreamError::Custom { - msg: format!( - "failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res.text().await.unwrap_or_default() - ), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); - } - - Ok(Some(res)) -} - -/// domain_name needs to be http://ip:port -/// dead code for now -#[allow(dead_code)] -pub fn ingestor_meta_filename(domain_name: &str) -> String { - if domain_name.starts_with("http://") | domain_name.starts_with("https://") { - let url = Url::parse(domain_name).unwrap(); - return format!( - "ingestor.{}.{}.json", - url.host_str().unwrap(), - url.port().unwrap() - ); - } - format!("ingestor.{}.json", domain_name) -} - pub fn to_url_string(str: String) -> String { // if the str is already a url i am guessing that it will end in '/' if str.starts_with("http://") || str.starts_with("https://") { diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index e106a2de9..903b51ebd 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -465,18 +465,6 @@ pub async fn get_stats( Ok((web::Json(stats), StatusCode::OK)) } -// Check if the first_event_at is empty -#[allow(dead_code)] -pub fn first_event_at_empty(stream_name: &str) -> bool { - let hash_map = STREAM_INFO.read().unwrap(); - if let Some(stream_info) = hash_map.get(stream_name) { - if let Some(first_event_at) = &stream_info.first_event_at { - return first_event_at.is_empty(); - } - } - true -} - fn remove_id_from_alerts(value: &mut Value) { if let Some(Value::Array(alerts)) = value.get_mut("alerts") { alerts diff --git a/src/hottier.rs b/src/hottier.rs index 29039a1b4..c9e66c897 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -397,24 +397,6 @@ impl HotTierManager { Ok(file_processed) } - #[allow(dead_code)] - ///delete the files for the date range given from the hot tier directory for the stream - /// update the used and available size in the hot tier metadata - pub async fn delete_files_from_hot_tier( - &self, - stream: &str, - dates: &[NaiveDate], - ) -> Result<(), HotTierError> { - for date in dates.iter() { - let path = self.hot_tier_path.join(format!("{}/date={}", stream, date)); - if path.exists() { - fs::remove_dir_all(path.clone()).await?; - } - } - - Ok(()) - } - ///fetch the list of dates available in the hot tier directory for the stream and sort them pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result, HotTierError> { let mut date_list = Vec::new(); diff --git a/src/kafka.rs b/src/kafka.rs index 9ba697a97..423d227ee 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -55,7 +55,6 @@ pub enum SslProtocol { SaslSsl, } -#[allow(dead_code)] #[derive(Debug, thiserror::Error)] pub enum KafkaError { #[error("Please set env var {0} (To use Kafka integration env vars P_KAFKA_TOPICS, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")] diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index b9e2bc555..1896bce0c 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -320,11 +320,4 @@ pub mod error { .body(self.to_string()) } } - - #[allow(dead_code)] - fn construct_custom_error() { - let error = - MetricsError::Custom("Some error".to_string(), StatusCode::INTERNAL_SERVER_ERROR); - println!("{:?}", error); - } } diff --git a/src/query/mod.rs b/src/query/mod.rs index 424b842db..857cf18df 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -20,8 +20,8 @@ mod filter_optimizer; mod listing_table_builder; pub mod stream_schema_provider; +use chrono::NaiveDateTime; use chrono::{DateTime, Duration, Utc}; -use chrono::{NaiveDateTime, TimeZone}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; @@ -38,9 +38,7 @@ use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use std::collections::HashMap; use std::ops::Bound; -use std::path::{Path, PathBuf}; use std::sync::Arc; use stream_schema_provider::collect_manifest_files; use sysinfo::System; @@ -56,7 +54,7 @@ use crate::event; use crate::handlers::http::query::QueryError; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; -use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, StorageDir, STREAM_ROOT_DIRECTORY}; +use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}; use crate::utils::time::TimeRange; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); @@ -396,10 +394,6 @@ impl TableScanVisitor { pub fn into_inner(self) -> Vec { self.tables } - #[allow(dead_code)] - pub fn top(&self) -> Option<&str> { - self.tables.first().map(|s| s.as_ref()) - } } impl TreeNodeVisitor<'_> for TableScanVisitor { @@ -563,47 +557,6 @@ fn table_contains_any_time_filters( }) } -#[allow(dead_code)] -fn get_staging_prefixes( - stream_name: &str, - start: DateTime, - end: DateTime, -) -> HashMap> { - let dir = StorageDir::new(stream_name); - let mut files = dir.arrow_files_grouped_by_time(); - files.retain(|k, _| path_intersects_query(k, start, end)); - files -} - -fn path_intersects_query(path: &Path, starttime: DateTime, endtime: DateTime) -> bool { - let time = time_from_path(path); - starttime <= time && time <= endtime -} - -fn time_from_path(path: &Path) -> DateTime { - let prefix = path - .file_name() - .expect("all given path are file") - .to_str() - .expect("filename is valid"); - - // Next three in order will be date, hour and minute - let mut components = prefix.splitn(3, '.'); - - let date = components.next().expect("date=xxxx-xx-xx"); - let hour = components.next().expect("hour=xx"); - let minute = components.next().expect("minute=xx"); - - let year = date[5..9].parse().unwrap(); - let month = date[10..12].parse().unwrap(); - let day = date[13..15].parse().unwrap(); - let hour = hour[5..7].parse().unwrap(); - let minute = minute[7..9].parse().unwrap(); - - Utc.with_ymd_and_hms(year, month, day, hour, minute, 0) - .unwrap() -} - /// unused for now might need it later #[allow(unused)] pub fn flatten_objects_for_count(objects: Vec) -> Vec { @@ -671,16 +624,6 @@ mod tests { use crate::query::flatten_objects_for_count; - use super::time_from_path; - use std::path::PathBuf; - - #[test] - fn test_time_from_parquet_path() { - let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); - let time = time_from_path(path.as_path()); - assert_eq!(time.timestamp(), 1640995200); - } - #[test] fn test_flat_simple() { let val = vec![ diff --git a/src/response.rs b/src/response.rs index bbab052ae..8211bf4c7 100644 --- a/src/response.rs +++ b/src/response.rs @@ -16,18 +16,11 @@ * */ -use crate::{ - handlers::http::query::QueryError, - utils::arrow::{ - flight::{into_flight_data, DoGetStream}, - record_batches_to_json, - }, -}; +use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; use actix_web::HttpResponse; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; -use tonic::{Response, Status}; use tracing::info; pub struct QueryResponse { @@ -65,9 +58,4 @@ impl QueryResponse { Ok(HttpResponse::Ok().json(response)) } - - #[allow(dead_code)] - pub fn into_flight(self) -> Result, Status> { - into_flight_data(self.records) - } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 85c46dade..f86b55757 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -258,8 +258,4 @@ pub enum ObjectStorageError { PathError(relative_path::FromPathError), #[error("Error: {0}")] MetadataError(#[from] MetadataError), - - #[allow(dead_code)] - #[error("Authentication Error: {0}")] - AuthenticationError(Box), } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index e115dd0bf..04ff44de7 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -50,9 +50,8 @@ use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; use std::collections::HashMap; -#[allow(dead_code)] // in bytes -const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; +// const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; const CONNECT_TIMEOUT_SECS: u64 = 5; const REQUEST_TIMEOUT_SECS: u64 = 300; const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; diff --git a/src/storage/staging.rs b/src/storage/staging.rs index c05ef62c4..aab6603d9 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -136,22 +136,6 @@ impl StorageDir { paths } - #[allow(dead_code)] - pub fn arrow_files_grouped_by_time(&self) -> HashMap> { - // hashmap - let mut grouped_arrow_file: HashMap> = HashMap::new(); - let arrow_files = self.arrow_files(); - for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path, String::default()); - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); - } - - grouped_arrow_file - } - pub fn arrow_files_grouped_exclude_time( &self, exclude: NaiveDateTime, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index e539b9e9f..dd75504b7 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -37,12 +37,6 @@ use std::collections::HashMap; use std::env; use tracing::debug; use url::Url; -#[allow(dead_code)] -pub fn hostname() -> Option { - hostname::get() - .ok() - .and_then(|hostname| hostname.into_string().ok()) -} pub fn hostname_unchecked() -> String { hostname::get().unwrap().into_string().unwrap() @@ -95,7 +89,6 @@ pub struct TimePeriod { data_granularity: u32, } -#[allow(dead_code)] impl TimePeriod { pub fn new(start: DateTime, end: DateTime, data_granularity: u32) -> Self { Self {