Skip to content

Commit 5092caf

Browse files
committed
Impl migration of server metadata files on Object Store
1 parent 754d813 commit 5092caf

File tree

9 files changed

+164
-57
lines changed

9 files changed

+164
-57
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::handlers::http::logstream::error::StreamError;
2323
use crate::option::CONFIG;
2424

2525
use crate::metrics::prom_utils::Metrics;
26+
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
2627
use actix_web::http::header;
2728
use actix_web::Responder;
2829
use http::StatusCode;
@@ -334,7 +335,7 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
334335
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
335336
let store = CONFIG.storage().get_object_store();
336337

337-
let root_path = RelativePathBuf::from("");
338+
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
338339
let arr = store
339340
.get_objects(Some(&root_path))
340341
.await?

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,17 @@ impl ParseableServer for IngestServer {
102102
/// implement the init method will just invoke the initialize method
103103
async fn init(&self) -> anyhow::Result<()> {
104104
self.validate()?;
105+
// check for querier state. Is it there, or was it there in the past
106+
self.check_querier_state().await?;
107+
// to get the .parseable.json file in staging
108+
self.validate_credentials().await?;
109+
110+
let metadata = storage::resolve_parseable_metadata().await?;
111+
banner::print(&CONFIG, &metadata).await;
112+
rbac::map::init(&metadata);
113+
// set the info in the global metadata
114+
metadata.set_global();
115+
105116
self.initialize().await
106117
}
107118

@@ -267,19 +278,6 @@ impl IngestServer {
267278
}
268279

269280
async fn initialize(&self) -> anyhow::Result<()> {
270-
// check for querier state. Is it there, or was it there in the past
271-
self.check_querier_state().await?;
272-
// to get the .parseable.json file in staging
273-
self.validate_credentials().await?;
274-
275-
let metadata = storage::resolve_parseable_metadata().await?;
276-
banner::print(&CONFIG, &metadata).await;
277-
278-
rbac::map::init(&metadata);
279-
280-
// set the info in the global metadata
281-
metadata.set_global();
282-
283281
if let Some(cache_manager) = LocalCacheManager::global() {
284282
cache_manager
285283
.validate(CONFIG.parseable.local_cache_size)

server/src/handlers/http/modal/query_server.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ impl ParseableServer for QueryServer {
9898
/// implementation of init should just invoke a call to initialize
9999
async fn init(&self) -> anyhow::Result<()> {
100100
self.validate()?;
101+
migration::meta_file_migration(&CONFIG).await?;
102+
migration::run_metadata_migration(&CONFIG).await?;
103+
let metadata = storage::resolve_parseable_metadata().await?;
104+
banner::print(&CONFIG, &metadata).await;
105+
// initialize the rbac map
106+
rbac::map::init(&metadata);
107+
// keep metadata info in mem
108+
metadata.set_global();
101109
self.initialize().await
102110
}
103111

@@ -153,18 +161,6 @@ impl QueryServer {
153161

154162
/// initialize the server, run migrations as needed and start the server
155163
async fn initialize(&self) -> anyhow::Result<()> {
156-
migration::run_metadata_migration(&CONFIG).await?;
157-
158-
let metadata = storage::resolve_parseable_metadata().await?;
159-
160-
banner::print(&CONFIG, &metadata).await;
161-
162-
// initialize the rbac map
163-
rbac::map::init(&metadata);
164-
165-
// keep metadata info in mem
166-
metadata.set_global();
167-
168164
let prometheus = metrics::build_metrics_handler();
169165
CONFIG.storage().register_store_metrics(&prometheus);
170166

@@ -177,7 +173,6 @@ impl QueryServer {
177173

178174
// track all parquet files already in the data directory
179175
storage::retention::load_retention_from_global();
180-
181176
// load data from stats back to prometheus metrics
182177
metrics::fetch_stats_from_storage().await;
183178

server/src/handlers/http/modal/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ impl Server {
405405
}
406406

407407
async fn initialize(&self) -> anyhow::Result<()> {
408+
migration::meta_file_migration(&CONFIG).await?;
408409
migration::run_metadata_migration(&CONFIG).await?;
409410
let metadata = storage::resolve_parseable_metadata().await?;
410411
banner::print(&CONFIG, &metadata).await;

server/src/migration.rs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use serde::Serialize;
3030
use crate::{
3131
option::Config,
3232
storage::{
33-
object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError,SCHEMA_FILE_NAME,
33+
object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME
3434
},
3535
};
3636

@@ -195,7 +195,6 @@ pub async fn put_remote_metadata(
195195

196196
pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> {
197197
let path = parseable_json_path().to_path(config.staging_dir());
198-
//config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
199198
let mut file = OpenOptions::new()
200199
.create(true)
201200
.truncate(true)
@@ -204,3 +203,43 @@ pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> an
204203
serde_json::to_writer(&mut file, metadata)?;
205204
Ok(())
206205
}
206+
207+
pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
208+
let object_store = config.storage().get_object_store();
209+
210+
let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
211+
212+
// if this errors that means migrations is already done
213+
if let Err(err) = object_store.get_object(&old_meta_file_path).await {
214+
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
215+
return Ok(());
216+
}
217+
return Err(err.into());
218+
}
219+
220+
log::info!("Migrating metadata files to new location");
221+
222+
// get the list of all meta files
223+
let mut meta_files = object_store.get_ingester_meta_file_paths().await?;
224+
meta_files.push(old_meta_file_path);
225+
226+
for file in meta_files {
227+
match object_store.get_object(&file).await {
228+
Ok(bytes) => {
229+
// we can unwrap here because we know the file exists
230+
let new_path = RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, file.file_name().unwrap()]);
231+
object_store.put_object(&new_path, bytes).await?;
232+
object_store.delete_object(&file).await?;
233+
},
234+
Err(err) => {
235+
// if error is not a no such key error, something weird happened
236+
// so return the error
237+
if !matches!(err, ObjectStorageError::NoSuchKey(_)) {
238+
return Err(err.into());
239+
}
240+
}
241+
}
242+
}
243+
244+
Ok(())
245+
}

server/src/storage/localfs.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use bytes::Bytes;
2727
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig};
2828
use fs_extra::file::CopyOptions;
2929
use futures::{stream::FuturesUnordered, TryStreamExt};
30-
use relative_path::RelativePath;
30+
use relative_path::{RelativePath, RelativePathBuf};
3131
use tokio::fs::{self, DirEntry};
3232
use tokio_stream::wrappers::ReadDirStream;
3333

@@ -114,6 +114,39 @@ impl ObjectStorage for LocalFS {
114114
res
115115
}
116116

117+
async fn get_ingester_meta_file_paths(
118+
&self,
119+
) -> Result<Vec<RelativePathBuf>, ObjectStorageError> {
120+
let time = Instant::now();
121+
122+
let mut path_arr = vec![];
123+
let mut entries = fs::read_dir(&self.root).await?;
124+
125+
while let Some(entry) = entries.next_entry().await? {
126+
let flag = entry
127+
.path()
128+
.file_name()
129+
.unwrap_or_default()
130+
.to_str()
131+
.unwrap_or_default()
132+
.contains("ingester");
133+
134+
if flag {
135+
path_arr.push(
136+
RelativePathBuf::from_path(entry.path().file_name().unwrap())
137+
.map_err(|err| ObjectStorageError::Custom(err.to_string()))?,
138+
);
139+
}
140+
}
141+
142+
let time = time.elapsed().as_secs_f64();
143+
REQUEST_RESPONSE_TIME
144+
.with_label_values(&["GET", "200"]) // this might not be the right status code
145+
.observe(time);
146+
147+
Ok(path_arr)
148+
}
149+
117150
async fn get_objects(
118151
&self,
119152
base_path: Option<&RelativePath>,
@@ -183,6 +216,12 @@ impl ObjectStorage for LocalFS {
183216
Ok(())
184217
}
185218

219+
async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> {
220+
let path = self.path_in_root(path);
221+
tokio::fs::remove_file(path).await?;
222+
Ok(())
223+
}
224+
186225
async fn check(&self) -> Result<(), ObjectStorageError> {
187226
fs::create_dir_all(&self.root)
188227
.await

server/src/storage/object_storage.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ pub trait ObjectStorage: Sync + 'static {
8282
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
8383
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
8484
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
85-
85+
async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
86+
async fn get_ingester_meta_file_paths(&self) -> Result<Vec<RelativePathBuf>, ObjectStorageError>;
8687
/// Returns the amount of time taken by the `ObjectStore` to perform a get
8788
/// call.
8889
async fn get_latency(&self) -> Duration {

server/src/storage/s3.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
2929
use object_store::limit::LimitStore;
3030
use object_store::path::Path as StorePath;
3131
use object_store::{ClientOptions, ObjectStore};
32-
use relative_path::RelativePath;
32+
use relative_path::{RelativePath, RelativePathBuf};
3333
use tokio::fs::OpenOptions;
3434
use tokio::io::{AsyncReadExt, AsyncWriteExt};
3535

@@ -452,6 +452,29 @@ impl ObjectStorage for S3 {
452452
Ok(res)
453453
}
454454

455+
async fn get_ingester_meta_file_paths(
456+
&self,
457+
) -> Result<Vec<RelativePathBuf>, ObjectStorageError> {
458+
let time = Instant::now();
459+
let mut path_arr = vec![];
460+
let mut object_stream = self.client.list(Some(&self.root)).await?;
461+
462+
while let Some(meta) = object_stream.next().await.transpose()? {
463+
let flag = meta.location.filename().unwrap().starts_with("ingester");
464+
465+
if flag {
466+
path_arr.push(RelativePathBuf::from(meta.location.as_ref()));
467+
}
468+
}
469+
470+
let time = time.elapsed().as_secs_f64();
471+
REQUEST_RESPONSE_TIME
472+
.with_label_values(&["GET", "200"])
473+
.observe(time);
474+
475+
Ok(path_arr)
476+
}
477+
455478
async fn put_object(
456479
&self,
457480
path: &RelativePath,
@@ -470,6 +493,10 @@ impl ObjectStorage for S3 {
470493
Ok(())
471494
}
472495

496+
async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> {
497+
Ok(self.client.delete(&to_object_store_path(path)).await?)
498+
}
499+
473500
async fn check(&self) -> Result<(), ObjectStorageError> {
474501
Ok(self
475502
.client

server/src/storage/store_metadata.rs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -126,33 +126,39 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
126126
Err("Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server")
127127
}
128128
EnvChange::NewStaging(mut metadata) => {
129-
create_dir_all(CONFIG.staging_dir())?;
130-
metadata.staging = CONFIG.staging_dir().canonicalize()?;
131-
// this flag is set to true so that metadata is copied to staging
132-
overwrite_staging = true;
133-
// overwrite remote in all and query mode
134-
// because staging dir has changed.
135-
match CONFIG.parseable.mode {
136-
Mode::All => {
137-
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
138-
.map_err(|err| {
139-
ObjectStorageError::Custom(err.to_string())
140-
})?;
129+
// if server is started in ingest mode,we need to make sure that query mode has been started
130+
// i.e the metadata is updated to reflect the server mode = Query
131+
if Mode::from_string(&metadata.server_mode).unwrap() == Mode::All || CONFIG.parseable.mode == Mode::Ingest {
132+
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
133+
} else {
134+
create_dir_all(CONFIG.staging_dir())?;
135+
metadata.staging = CONFIG.staging_dir().canonicalize()?;
136+
// this flag is set to true so that metadata is copied to staging
137+
overwrite_staging = true;
138+
// overwrite remote in all and query mode
139+
// because staging dir has changed.
140+
match CONFIG.parseable.mode {
141+
Mode::All => {
142+
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
143+
.map_err(|err| {
144+
ObjectStorageError::Custom(err.to_string())
145+
})?;
146+
overwrite_remote = true;
147+
},
148+
Mode::Query => {
141149
overwrite_remote = true;
142-
},
143-
Mode::Query => {
144-
overwrite_remote = true;
145-
metadata.server_mode = CONFIG.parseable.mode.to_string();
146-
metadata.staging = CONFIG.staging_dir().to_path_buf();
147-
},
148-
Mode::Ingest => {
149-
// if ingest server is started fetch the metadata from remote
150-
// update the server mode for local metadata
151-
metadata.server_mode = CONFIG.parseable.mode.to_string();
152-
metadata.staging = CONFIG.staging_dir().to_path_buf();
153-
},
150+
metadata.server_mode = CONFIG.parseable.mode.to_string();
151+
metadata.staging = CONFIG.staging_dir().to_path_buf();
152+
},
153+
Mode::Ingest => {
154+
// if ingest server is started fetch the metadata from remote
155+
// update the server mode for local metadata
156+
metadata.server_mode = CONFIG.parseable.mode.to_string();
157+
metadata.staging = CONFIG.staging_dir().to_path_buf();
158+
},
159+
}
160+
Ok(metadata)
154161
}
155-
Ok(metadata)
156162
}
157163
EnvChange::CreateBoth => {
158164
create_dir_all(CONFIG.staging_dir())?;

0 commit comments

Comments
 (0)