Skip to content

Commit 337dae8

Browse files
committed
rm dead code
1 parent aa6927f commit 337dae8

File tree

3 files changed

+18
-54
lines changed

3 files changed

+18
-54
lines changed

server/src/handlers/http.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,12 @@ pub fn base_path_without_preceding_slash() -> String {
5959
base_path().trim_start_matches('/').to_string()
6060
}
6161

62-
6362
pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
6463
let mut res = vec![];
6564
let ima = QueryServer::get_ingester_info().await.unwrap();
6665

6766
for im in ima {
68-
// todo:
67+
// TODO: when you rebase the code from the Cluster Info PR update this uri generation
6968
let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name);
7069
let reqw = reqwest::Client::new()
7170
.get(uri)
@@ -89,6 +88,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
8988
let ima = QueryServer::get_ingester_info().await.unwrap();
9089

9190
for im in ima.iter() {
91+
// TODO: when you rebase the code from the Cluster Info PR update this uri generation
9292
let uri = format!("{}api/v1/{}", im.domain_name, "query");
9393
let reqw = reqwest::Client::new()
9494
.post(uri)
@@ -100,6 +100,8 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
100100

101101
if reqw.status().is_success() {
102102
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
103+
// the value returned is an array of json objects
104+
// so it needs to be flattened
103105
if let Some(arr) = v.as_array() {
104106
for val in arr {
105107
res.push(val.clone())

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -125,29 +125,7 @@ impl IngestServer {
125125
.service(Self::logstream_api()),
126126
)
127127
.service(Server::get_liveness_factory())
128-
.service(Server::get_readiness_factory())
129-
.service(Self::get_metrics_webscope());
130-
}
131-
132-
fn get_metrics_webscope() -> Scope {
133-
web::scope("/logstream").service(
134-
web::scope("/{logstream}")
135-
.service(
136-
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
137-
web::resource("/schema").route(
138-
web::get()
139-
.to(logstream::schema)
140-
.authorize_for_stream(Action::GetSchema),
141-
),
142-
)
143-
.service(
144-
web::resource("/stats").route(
145-
web::get()
146-
.to(logstream::get_stats)
147-
.authorize_for_stream(Action::GetStats),
148-
),
149-
),
150-
)
128+
.service(Server::get_readiness_factory());
151129
}
152130

153131
fn logstream_api() -> Scope {

server/src/handlers/http/query.rs

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
22-
use arrow_json::reader::infer_json_schema_from_iterator;
2322
use chrono::{DateTime, Utc};
2423
use datafusion::error::DataFusionError;
2524
use datafusion::execution::context::SessionState;
@@ -30,10 +29,7 @@ use std::pin::Pin;
3029
use std::sync::Arc;
3130
use std::time::Instant;
3231

33-
// Eshan's Code Under test
34-
#[allow(unused_imports)]
3532
use arrow_schema::Schema;
36-
#[allow(unused_imports)]
3733
use crate::handlers::http::send_schema_request;
3834

3935
use crate::event::commit_schema;
@@ -68,40 +64,28 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
6864
let session_state = QUERY_SESSION.state();
6965
let mut query = into_query(&query_request, &session_state).await?;
7066

71-
// if CONFIG.parseable.mode == Mode::Query {
72-
// if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
73-
// let new_schema =
74-
// Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;
67+
if CONFIG.parseable.mode == Mode::Query {
68+
if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
69+
let new_schema =
70+
Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;
7571

76-
// commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
77-
// .map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
72+
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
73+
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
7874

79-
// commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
80-
// .await
81-
// .map_err(|err| {
82-
// QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
83-
// })?;
84-
// }
85-
// }
75+
commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
76+
.await
77+
.map_err(|err| {
78+
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
79+
})?;
80+
}
81+
}
8682

8783
let mmem = if CONFIG.parseable.mode == Mode::Query {
8884
// create a new query to send to the ingestors
8985
if let Some(que) = transform_query_for_ingestor(&query_request) {
9086
let vals = send_request_to_ingestor(&que)
9187
.await
9288
.map_err(|err| QueryError::Custom(err.to_string()))?;
93-
let infered_schema = infer_json_schema_from_iterator(vals.iter().map(Ok)).map_err(|err| {
94-
QueryError::Custom(format!("Error inferring schema from iterator\nError:{err}"))
95-
})?;
96-
97-
commit_schema(&query.table_name().unwrap(), Arc::new(infered_schema.clone()))
98-
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
99-
100-
commit_schema_to_storage(&query.table_name().unwrap(), infered_schema)
101-
.await
102-
.map_err(|err| {
103-
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
104-
})?;
10589

10690
Some(vals)
10791
} else {

0 commit comments

Comments
 (0)