Skip to content

Commit f3e3afa

Browse files
committed
impl ingester metadata migration
1 parent 23d9e01 commit f3e3afa

File tree

5 files changed

+72
-19
lines changed

5 files changed

+72
-19
lines changed

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::handlers::http::middleware::RouteExt;
2424
use crate::localcache::LocalCacheManager;
2525
use crate::metadata;
2626
use crate::metrics;
27+
use crate::migration::metadata_migration::migrate_ingester_metadata;
2728
use crate::rbac;
2829
use crate::rbac::role::Action;
2930
use crate::storage;
@@ -39,6 +40,10 @@ use super::IngestorMetadata;
3940
use super::OpenIdClient;
4041
use super::ParseableServer;
4142

43+
use crate::{
44+
handlers::http::{base_path, cross_origin_config},
45+
option::CONFIG,
46+
};
4247
use actix_web::body::MessageBody;
4348
use actix_web::Scope;
4449
use actix_web::{web, App, HttpServer};
@@ -50,14 +55,9 @@ use itertools::Itertools;
5055
use once_cell::sync::Lazy;
5156
use relative_path::RelativePathBuf;
5257

53-
use crate::{
54-
handlers::http::{base_path, cross_origin_config},
55-
option::CONFIG,
56-
};
57-
5858
/// ! have to use a guard before using it
5959
pub static INGESTOR_META: Lazy<IngestorMetadata> =
60-
Lazy::new(|| staging::get_ingestor_info().expect("dir is readable and writeable"));
60+
Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json"));
6161

6262
#[derive(Default)]
6363
pub struct IngestServer;
@@ -106,6 +106,8 @@ impl ParseableServer for IngestServer {
106106

107107
/// implement the init method will just invoke the initialize method
108108
async fn init(&self) -> anyhow::Result<()> {
109+
migrate_ingester_metadata().await?;
110+
109111
self.validate()?;
110112

111113
// check for querier state. Is it there, or was it there in the past

server/src/migration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*
1818
*/
1919

20-
mod metadata_migration;
20+
pub mod metadata_migration;
2121
mod schema_migration;
2222
mod stream_metadata_migration;
2323

server/src/migration/metadata_migration.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
use rand::distributions::DistString;
2020
use serde_json::{Map, Value as JsonValue};
2121

22-
use crate::option::CONFIG;
22+
use crate::{
23+
handlers::http::modal::IngestorMetadata,
24+
option::CONFIG,
25+
storage::{object_storage::ingestor_metadata_path, staging},
26+
};
27+
use actix_web::body::MessageBody;
2328

2429
/*
2530
v1
@@ -118,3 +123,34 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue {
118123

119124
storage_metadata
120125
}
126+
127+
pub async fn migrate_ingester_metadata() -> anyhow::Result<()> {
128+
let imp = ingestor_metadata_path(None);
129+
let bytes = CONFIG.storage().get_object_store().get_object(&imp).await?;
130+
let mut json = serde_json::from_slice::<JsonValue>(&bytes)?;
131+
let meta = json
132+
.as_object_mut()
133+
.ok_or_else(|| anyhow::anyhow!("Unable to parse Ingester Metadata"))?;
134+
let fp = meta.get("flight_port");
135+
136+
if fp.is_none() {
137+
meta.insert(
138+
"flight_port".to_owned(),
139+
JsonValue::String(CONFIG.parseable.flight_port.to_string()),
140+
);
141+
}
142+
let bytes = serde_json::to_string(&json)?
143+
.try_into_bytes()
144+
.map_err(|err| anyhow::anyhow!(err))?;
145+
146+
let resource: IngestorMetadata = serde_json::from_value(json)?;
147+
staging::put_ingestor_info(resource.clone())?;
148+
149+
CONFIG
150+
.storage()
151+
.get_object_store()
152+
.put_object(&imp, bytes)
153+
.await?;
154+
155+
Ok(())
156+
}

server/src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ pub async fn commit_schema_to_storage(
530530
}
531531

532532
#[inline(always)]
533-
fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
533+
pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
534534
serde_json::to_vec(any)
535535
.map(|any| any.into())
536536
.expect("serialize cannot fail")

server/src/storage/staging.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,6 @@
1717
*
1818
*/
1919

20-
use std::{
21-
collections::HashMap,
22-
fs,
23-
path::{Path, PathBuf},
24-
process,
25-
sync::Arc,
26-
};
27-
2820
use crate::{
2921
event::DEFAULT_TIMESTAMP_KEY,
3022
handlers::http::modal::{ingest_server::INGESTOR_META, IngestorMetadata, DEFAULT_VERSION},
@@ -36,6 +28,7 @@ use crate::{
3628
hostname_unchecked,
3729
},
3830
};
31+
use anyhow::anyhow;
3932
use arrow_schema::{ArrowError, Schema};
4033
use base64::Engine;
4134
use chrono::{NaiveDateTime, Timelike, Utc};
@@ -47,6 +40,14 @@ use parquet::{
4740
format::SortingColumn,
4841
schema::types::ColumnPath,
4942
};
43+
use serde_json::Value as JsonValue;
44+
use std::{
45+
collections::HashMap,
46+
fs,
47+
path::{Path, PathBuf},
48+
process,
49+
sync::Arc,
50+
};
5051

5152
const ARROW_FILE_EXTENSION: &str = "data.arrows";
5253
const PARQUET_FILE_EXTENSION: &str = "data.parquet";
@@ -318,7 +319,21 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
318319

319320
if flag {
320321
// get the ingestor metadata from staging
321-
let mut meta: IngestorMetadata = serde_json::from_slice(&std::fs::read(path)?)?;
322+
let mut meta: JsonValue = serde_json::from_slice(&std::fs::read(path)?)?;
323+
324+
// migrate the staging meta
325+
let obj = meta
326+
.as_object_mut()
327+
.ok_or_else(|| anyhow!("Could Not parse Ingestor Metadata Json"))?;
328+
329+
if obj.get("flight_port").is_none() {
330+
obj.insert(
331+
"flight_port".to_owned(),
332+
JsonValue::String(CONFIG.parseable.flight_port.to_string()),
333+
);
334+
}
335+
336+
let mut meta: IngestorMetadata = serde_json::from_value(meta)?;
322337

323338
// compare url endpoint and port
324339
if meta.domain_name != url {
@@ -379,7 +394,7 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
379394
/// # Parameters
380395
///
381396
/// * `ingestor_info`: The ingestor info to be stored.
382-
fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> {
397+
pub fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> {
383398
let path = PathBuf::from(&CONFIG.parseable.local_staging_path);
384399
let file_name = format!("ingestor.{}.json", info.ingestor_id);
385400
let file_path = path.join(file_name);

0 commit comments

Comments
 (0)