Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use crate::storage::staging::StorageDir;
use chrono::NaiveDateTime;

pub struct ArrowWriter {
#[allow(dead_code)]
pub file_path: PathBuf,
pub writer: StreamWriter<File>,
}

Expand All @@ -54,20 +52,14 @@ impl FileWriter {
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(
let (_, writer) = init_new_stream_writer_file(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);
self.insert(schema_key.to_owned(), ArrowWriter { writer });
}
};

Expand Down
68 changes: 1 addition & 67 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@
*
*/

use crate::{
handlers::http::{
base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata,
},
HTTP_CLIENT,
};
use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT};
use actix_web::http::header;
use chrono::{DateTime, Utc};
use http::StatusCode;
use itertools::Itertools;
use reqwest::Response;
use serde::{Deserialize, Serialize};
use tracing::error;
use url::Url;
Expand Down Expand Up @@ -247,65 +240,6 @@ pub async fn check_liveness(domain_name: &str) -> bool {
req.is_ok()
}

/// send a request to the ingestor to fetch its stats
/// dead for now
#[allow(dead_code)]
pub async fn send_stats_request(
url: &str,
ingestor: IngestorMetadata,
) -> Result<Option<Response>, StreamError> {
if !check_liveness(&ingestor.domain_name).await {
return Ok(None);
}

let res = HTTP_CLIENT
.get(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.send()
.await
.map_err(|err| {
error!(
"Fatal: failed to fetch stats from ingestor: {}\n Error: {:?}",
ingestor.domain_name, err
);

StreamError::Network(err)
})?;

if !res.status().is_success() {
error!(
"failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name, res
);
return Err(StreamError::Custom {
msg: format!(
"failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await.unwrap_or_default()
),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
}

Ok(Some(res))
}

/// domain_name needs to be http://ip:port
/// dead code for now
#[allow(dead_code)]
pub fn ingestor_meta_filename(domain_name: &str) -> String {
if domain_name.starts_with("http://") | domain_name.starts_with("https://") {
let url = Url::parse(domain_name).unwrap();
return format!(
"ingestor.{}.{}.json",
url.host_str().unwrap(),
url.port().unwrap()
);
}
format!("ingestor.{}.json", domain_name)
}

pub fn to_url_string(str: String) -> String {
// if the str is already a url i am guessing that it will end in '/'
if str.starts_with("http://") || str.starts_with("https://") {
Expand Down
12 changes: 0 additions & 12 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,18 +465,6 @@ pub async fn get_stats(
Ok((web::Json(stats), StatusCode::OK))
}

// Check if the first_event_at is empty
#[allow(dead_code)]
pub fn first_event_at_empty(stream_name: &str) -> bool {
let hash_map = STREAM_INFO.read().unwrap();
if let Some(stream_info) = hash_map.get(stream_name) {
if let Some(first_event_at) = &stream_info.first_event_at {
return first_event_at.is_empty();
}
}
true
}

fn remove_id_from_alerts(value: &mut Value) {
if let Some(Value::Array(alerts)) = value.get_mut("alerts") {
alerts
Expand Down
18 changes: 0 additions & 18 deletions src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,24 +397,6 @@ impl HotTierManager {
Ok(file_processed)
}

#[allow(dead_code)]
///delete the files for the date range given from the hot tier directory for the stream
/// update the used and available size in the hot tier metadata
pub async fn delete_files_from_hot_tier(
&self,
stream: &str,
dates: &[NaiveDate],
) -> Result<(), HotTierError> {
for date in dates.iter() {
let path = self.hot_tier_path.join(format!("{}/date={}", stream, date));
if path.exists() {
fs::remove_dir_all(path.clone()).await?;
}
}

Ok(())
}

///fetch the list of dates available in the hot tier directory for the stream and sort them
pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result<Vec<NaiveDate>, HotTierError> {
let mut date_list = Vec::new();
Expand Down
1 change: 0 additions & 1 deletion src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub enum SslProtocol {
SaslSsl,
}

#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
pub enum KafkaError {
#[error("Please set env var {0} (To use Kafka integration env vars P_KAFKA_TOPICS, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")]
Expand Down
7 changes: 0 additions & 7 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,4 @@ pub mod error {
.body(self.to_string())
}
}

#[allow(dead_code)]
fn construct_custom_error() {
let error =
MetricsError::Custom("Some error".to_string(), StatusCode::INTERNAL_SERVER_ERROR);
println!("{:?}", error);
}
}
61 changes: 2 additions & 59 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ mod filter_optimizer;
mod listing_table_builder;
pub mod stream_schema_provider;

use chrono::NaiveDateTime;
use chrono::{DateTime, Duration, Utc};
use chrono::{NaiveDateTime, TimeZone};
use datafusion::arrow::record_batch::RecordBatch;

use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
Expand All @@ -38,9 +38,7 @@ use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use stream_schema_provider::collect_manifest_files;
use sysinfo::System;
Expand All @@ -56,7 +54,7 @@ use crate::event;
use crate::handlers::http::query::QueryError;
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, StorageDir, STREAM_ROOT_DIRECTORY};
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, STREAM_ROOT_DIRECTORY};
use crate::utils::time::TimeRange;
pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(CONFIG.storage()));
Expand Down Expand Up @@ -396,10 +394,6 @@ impl TableScanVisitor {
pub fn into_inner(self) -> Vec<String> {
self.tables
}
#[allow(dead_code)]
pub fn top(&self) -> Option<&str> {
self.tables.first().map(|s| s.as_ref())
}
}

impl TreeNodeVisitor<'_> for TableScanVisitor {
Expand Down Expand Up @@ -563,47 +557,6 @@ fn table_contains_any_time_filters(
})
}

#[allow(dead_code)]
fn get_staging_prefixes(
stream_name: &str,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> HashMap<PathBuf, Vec<PathBuf>> {
let dir = StorageDir::new(stream_name);
let mut files = dir.arrow_files_grouped_by_time();
files.retain(|k, _| path_intersects_query(k, start, end));
files
}

fn path_intersects_query(path: &Path, starttime: DateTime<Utc>, endtime: DateTime<Utc>) -> bool {
let time = time_from_path(path);
starttime <= time && time <= endtime
}

fn time_from_path(path: &Path) -> DateTime<Utc> {
let prefix = path
.file_name()
.expect("all given path are file")
.to_str()
.expect("filename is valid");

// Next three in order will be date, hour and minute
let mut components = prefix.splitn(3, '.');

let date = components.next().expect("date=xxxx-xx-xx");
let hour = components.next().expect("hour=xx");
let minute = components.next().expect("minute=xx");

let year = date[5..9].parse().unwrap();
let month = date[10..12].parse().unwrap();
let day = date[13..15].parse().unwrap();
let hour = hour[5..7].parse().unwrap();
let minute = minute[7..9].parse().unwrap();

Utc.with_ymd_and_hms(year, month, day, hour, minute, 0)
.unwrap()
}

/// unused for now might need it later
#[allow(unused)]
pub fn flatten_objects_for_count(objects: Vec<Value>) -> Vec<Value> {
Expand Down Expand Up @@ -671,16 +624,6 @@ mod tests {

use crate::query::flatten_objects_for_count;

use super::time_from_path;
use std::path::PathBuf;

#[test]
fn test_time_from_parquet_path() {
let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet");
let time = time_from_path(path.as_path());
assert_eq!(time.timestamp(), 1640995200);
}

#[test]
fn test_flat_simple() {
let val = vec![
Expand Down
14 changes: 1 addition & 13 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,11 @@
*
*/

use crate::{
handlers::http::query::QueryError,
utils::arrow::{
flight::{into_flight_data, DoGetStream},
record_batches_to_json,
},
};
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use actix_web::HttpResponse;
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};
use tonic::{Response, Status};
use tracing::info;

pub struct QueryResponse {
Expand Down Expand Up @@ -65,9 +58,4 @@ impl QueryResponse {

Ok(HttpResponse::Ok().json(response))
}

#[allow(dead_code)]
pub fn into_flight(self) -> Result<Response<DoGetStream>, Status> {
into_flight_data(self.records)
}
}
4 changes: 0 additions & 4 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,4 @@ pub enum ObjectStorageError {
PathError(relative_path::FromPathError),
#[error("Error: {0}")]
MetadataError(#[from] MetadataError),

#[allow(dead_code)]
#[error("Authentication Error: {0}")]
AuthenticationError(Box<dyn std::error::Error + Send + Sync + 'static>),
}
3 changes: 1 addition & 2 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
use std::collections::HashMap;

#[allow(dead_code)]
// in bytes
const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
// const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
const CONNECT_TIMEOUT_SECS: u64 = 5;
const REQUEST_TIMEOUT_SECS: u64 = 300;
const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
Expand Down
16 changes: 0 additions & 16 deletions src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,6 @@ impl StorageDir {
paths
}

#[allow(dead_code)]
pub fn arrow_files_grouped_by_time(&self) -> HashMap<PathBuf, Vec<PathBuf>> {
// hashmap <time, vec[paths]>
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
let arrow_files = self.arrow_files();
for arrow_file_path in arrow_files {
let key = Self::arrow_path_to_parquet(&arrow_file_path, String::default());
grouped_arrow_file
.entry(key)
.or_default()
.push(arrow_file_path);
}

grouped_arrow_file
}

pub fn arrow_files_grouped_exclude_time(
&self,
exclude: NaiveDateTime,
Expand Down
7 changes: 0 additions & 7 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ use std::collections::HashMap;
use std::env;
use tracing::debug;
use url::Url;
#[allow(dead_code)]
pub fn hostname() -> Option<String> {
hostname::get()
.ok()
.and_then(|hostname| hostname.into_string().ok())
}

pub fn hostname_unchecked() -> String {
hostname::get().unwrap().into_string().unwrap()
Expand Down Expand Up @@ -95,7 +89,6 @@ pub struct TimePeriod {
data_granularity: u32,
}

#[allow(dead_code)]
impl TimePeriod {
pub fn new(start: DateTime<Utc>, end: DateTime<Utc>, data_granularity: u32) -> Self {
Self {
Expand Down
Loading