Skip to content

Commit f9f4ab3

Browse files
remove unused, update parseable server log test
1 parent 8eceae3 commit f9f4ab3

File tree

5 files changed

+12
-62
lines changed

5 files changed

+12
-62
lines changed

src/event/format/known_schema.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -515,25 +515,19 @@ mod tests {
515515
}
516516

517517
#[test]
518-
fn test_rust_server_logs() {
518+
fn test_parseable_server_logs() {
519519
let processor = EventProcessor::new(FORMATS_JSON);
520520
let schema = processor
521521
.schema_definitions
522-
.get("rust_server_logs")
522+
.get("parseable_server_logs")
523523
.unwrap();
524524

525525
let test_logs = vec![
526526
// Current parseable format with ThreadId
527-
"2025-09-06T10:43:01.628980875Z WARN main ThreadId(01) parseable::handlers::http::cluster:919: node http://0.0.0.0:8010/ is not live",
528-
"2025-09-06T10:44:12.62276265Z ERROR actix-rt|system:0|arbiter:17 ThreadId(163) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named a. Valid fields are serverlogs.log\")",
529-
"2025-09-06T05:16:46.092071318Z ERROR actix-rt|system:0|arbiter:21 ThreadId(167) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named ansible.host.ip\")",
530-
"2025-09-06T11:22:07.500864363Z WARN main ThreadId(01) parseable_enterprise:70: Received shutdown signal, notifying server to shut down...",
531-
// env_logger format
532-
"[2025-09-06T10:43:01.628980875Z INFO parseable::storage] Initializing storage backend",
533-
"[2025-09-06T10:43:01.628980875Z ERROR parseable::http::ingest] Failed to parse JSON",
534-
// Simple tracing format (no ThreadId)
535-
"2025-09-06T10:43:01.628980875Z INFO parseable::storage::s3: Storage configured successfully",
536-
"2025-09-06T10:43:01.628980875Z DEBUG parseable::query::engine: Query executed in 45ms",
527+
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T10:43:01.628980875Z WARN main ThreadId(01) parseable::handlers::http::cluster:919: node http://0.0.0.0:8010/ is not live",
528+
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T10:44:12.62276265Z ERROR actix-rt|system:0|arbiter:17 ThreadId(163) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named a. Valid fields are serverlogs.log\")",
529+
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T05:16:46.092071318Z ERROR actix-rt|system:0|arbiter:21 ThreadId(167) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named ansible.host.ip\")",
530+
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T11:22:07.500864363Z WARN main ThreadId(01) parseable_enterprise:70: Received shutdown signal, notifying server to shut down...",
537531
];
538532

539533
for (i, log_text) in test_logs.iter().enumerate() {

src/query/stream_schema_provider.rs

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use std::{any::Any, collections::HashMap, ops::Bound, path::PathBuf, sync::Arc};
19+
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
2020

2121
use arrow_array::RecordBatch;
2222
use arrow_schema::{Schema, SchemaRef, SortOptions};
@@ -46,13 +46,12 @@ use datafusion::{
4646
};
4747
use futures_util::TryFutureExt;
4848
use itertools::Itertools;
49-
use relative_path::RelativePathBuf;
5049

5150
use crate::{
5251
catalog::{
5352
ManifestFile, Snapshot as CatalogSnapshot,
5453
column::{Column, TypedStatistics},
55-
manifest::{File, Manifest},
54+
manifest::File,
5655
snapshot::{ManifestItem, Snapshot},
5756
},
5857
event::DEFAULT_TIMESTAMP_KEY,
@@ -64,7 +63,7 @@ use crate::{
6463
},
6564
option::Mode,
6665
parseable::{PARSEABLE, STREAM_EXISTS},
67-
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat},
66+
storage::{ObjectStorage, ObjectStoreFormat},
6867
};
6968

7069
use super::listing_table_builder::ListingTableBuilder;
@@ -869,37 +868,12 @@ fn extract_timestamp_bound(
869868
DateTime::from_timestamp_nanos(*value).naive_utc(),
870869
)),
871870
ScalarValue::Utf8(Some(str_value)) if is_time_partition => {
872-
Some((binexpr.op, str_value.parse::<NaiveDateTime>().unwrap()))
871+
Some((binexpr.op, str_value.parse().unwrap()))
873872
}
874873
_ => None,
875874
}
876875
}
877876

878-
pub async fn collect_manifest_files(
879-
storage: Arc<dyn ObjectStorage>,
880-
manifest_urls: Vec<String>,
881-
) -> Result<Vec<Manifest>, ObjectStorageError> {
882-
let mut tasks = Vec::new();
883-
manifest_urls.into_iter().for_each(|path| {
884-
let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
885-
let storage = Arc::clone(&storage);
886-
tasks.push(tokio::task::spawn(async move {
887-
storage.get_object(&path).await
888-
}));
889-
});
890-
891-
let mut op = Vec::new();
892-
for task in tasks {
893-
let file = task.await??;
894-
op.push(file);
895-
}
896-
897-
Ok(op
898-
.into_iter()
899-
.map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
900-
.collect())
901-
}
902-
903877
// Extract start time and end time from filter predicate
904878
pub fn extract_primary_filter(
905879
filters: &[Expr],

src/storage/azure_blob.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ impl BlobStore {
224224

225225
match resp {
226226
Ok(resp) => {
227-
let body: Bytes = resp.bytes().await.unwrap();
227+
let body: Bytes = resp.bytes().await?;
228228
STORAGE_REQUEST_RESPONSE_TIME
229229
.with_label_values(&["azure_blob", "GET", "200"])
230230
.observe(elapsed);
@@ -1062,15 +1062,6 @@ impl ObjectStorage for BlobStore {
10621062
Ok(minutes)
10631063
}
10641064

1065-
// async fn list_manifest_files(
1066-
// &self,
1067-
// stream_name: &str,
1068-
// ) -> Result<BTreeMap<String, Vec<String>>, ObjectStorageError> {
1069-
// let files = self._list_manifest_files(stream_name).await?;
1070-
1071-
// Ok(files)
1072-
// }
1073-
10741065
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
10751066
Ok(self._upload_file(key, path).await?)
10761067
}

src/storage/gcs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl Gcs {
185185

186186
match resp {
187187
Ok(resp) => {
188-
let body: Bytes = resp.bytes().await.unwrap();
188+
let body: Bytes = resp.bytes().await?;
189189
STORAGE_REQUEST_RESPONSE_TIME
190190
.with_label_values(&["gcs", "GET", "200"])
191191
.observe(elapsed);

src/storage/s3.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,15 +1159,6 @@ impl ObjectStorage for S3 {
11591159
Ok(minutes)
11601160
}
11611161

1162-
// async fn list_manifest_files(
1163-
// &self,
1164-
// stream_name: &str,
1165-
// ) -> Result<BTreeMap<String, Vec<String>>, ObjectStorageError> {
1166-
// let files = self._list_manifest_files(stream_name).await?;
1167-
1168-
// Ok(files)
1169-
// }
1170-
11711162
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
11721163
Ok(self._upload_file(key, path).await?)
11731164
}

0 commit comments

Comments
 (0)