Skip to content

Commit 932d423

Browse files
committed
fix: bugs causing server to not start in ingest mode
1 parent f3e3afa commit 932d423

File tree

5 files changed

+25
-9
lines changed

5 files changed

+25
-9
lines changed

server/src/handlers/airplane.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ impl FlightService for AirServiceImpl {
156156

157157
let time_delta = query.end - Utc::now();
158158

159-
let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 {
159+
let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 1 {
160160
let sql = format!(
161161
"{}\"query\": \"select * from {}\"{}",
162162
L_CURLY, &stream_name, R_CURLY

server/src/handlers/http/cluster/utils.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
*
1717
*/
1818

19-
use crate::handlers::http::{logstream::error::StreamError, modal::IngestorMetadata};
19+
use crate::handlers::http::{
20+
base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata,
21+
};
2022
use actix_web::http::header;
2123
use chrono::{DateTime, Utc};
2224
use http::StatusCode;
@@ -161,7 +163,11 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
161163
}
162164

163165
pub async fn check_liveness(domain_name: &str) -> bool {
164-
let uri = match Url::parse(&format!("{}liveness", domain_name)) {
166+
let uri = match Url::parse(&format!(
167+
"{}{}/liveness",
168+
domain_name,
169+
base_path_without_preceding_slash()
170+
)) {
165171
Ok(uri) => uri,
166172
Err(err) => {
167173
log::error!("Node Indentifier Failed To Parse: {}", err);

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ impl ParseableServer for IngestServer {
106106

107107
/// implement the init method will just invoke the initialize method
108108
async fn init(&self) -> anyhow::Result<()> {
109-
migrate_ingester_metadata().await?;
110-
111109
self.validate()?;
112110

113111
// check for querier state. Is it there, or was it there in the past
@@ -116,6 +114,7 @@ impl ParseableServer for IngestServer {
116114
self.validate_credentials().await?;
117115

118116
let metadata = storage::resolve_parseable_metadata().await?;
117+
119118
banner::print(&CONFIG, &metadata).await;
120119
rbac::map::init(&metadata);
121120
// set the info in the global metadata
@@ -217,6 +216,7 @@ impl IngestServer {
217216

218217
// create the ingestor metadata and put the .ingestor.json file in the object store
219218
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
219+
migrate_ingester_metadata().await?;
220220
let store = CONFIG.storage().get_object_store();
221221

222222
// find the meta file in staging if not generate new metadata

server/src/migration/metadata_migration.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,13 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue {
126126

127127
pub async fn migrate_ingester_metadata() -> anyhow::Result<()> {
128128
let imp = ingestor_metadata_path(None);
129-
let bytes = CONFIG.storage().get_object_store().get_object(&imp).await?;
129+
let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await {
130+
Ok(bytes) => bytes,
131+
Err(_) => {
132+
log::debug!("No metadata found for ingester. So migration is not required");
133+
return Ok(());
134+
}
135+
};
130136
let mut json = serde_json::from_slice::<JsonValue>(&bytes)?;
131137
let meta = json
132138
.as_object_mut()

server/src/utils/arrow/flight.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use tonic::transport::Channel;
1515

1616
pub fn get_query_from_ticket(req: Request<Ticket>) -> Result<QueryJson, Status> {
1717
if CONFIG.parseable.mode == Mode::Ingest {
18-
let query = serde_json::from_slice::<JsonValue>(&req.into_inner().ticket)
18+
let inner = req.into_inner().ticket;
19+
dbg!(&inner);
20+
let query = serde_json::from_slice::<JsonValue>(&inner)
1921
.map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"]
2022
.as_str()
2123
.ok_or_else(|| Status::failed_precondition("query is not valid string"))?
@@ -31,8 +33,10 @@ pub fn get_query_from_ticket(req: Request<Ticket>) -> Result<QueryJson, Status>
3133
})
3234
} else {
3335
Ok(
34-
serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
35-
.map_err(|err| Status::internal(err.to_string()))?,
36+
serde_json::from_slice::<QueryJson>(&req.into_inner().ticket).map_err(|err| {
37+
dbg!(&err);
38+
Status::internal(err.to_string())
39+
})?,
3640
)
3741
}
3842
}

0 commit comments

Comments
 (0)