Skip to content

Commit 620eecb

Browse files
committed
feat: migration of metadata files to seperate directories
1. Update the server initialization steps 2. Seperate out the logic to fetch directories with old streams and new streams 3. Update ObjectStorage Trait to refect the same 4. Fix the migration logic guard for Ingest Servers
1 parent 5092caf commit 620eecb

File tree

10 files changed

+267
-41
lines changed

10 files changed

+267
-41
lines changed

server/src/handlers/http/modal/query_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ impl ParseableServer for QueryServer {
9898
/// implementation of init should just invoke a call to initialize
9999
async fn init(&self) -> anyhow::Result<()> {
100100
self.validate()?;
101-
migration::meta_file_migration(&CONFIG).await?;
101+
migration::run_file_migration(&CONFIG).await?;
102+
CONFIG.validate_storage().await?;
102103
migration::run_metadata_migration(&CONFIG).await?;
103104
let metadata = storage::resolve_parseable_metadata().await?;
104105
banner::print(&CONFIG, &metadata).await;

server/src/handlers/http/modal/server.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ impl ParseableServer for Server {
141141
/// implementation of init should just invoke a call to initialize
142142
async fn init(&self) -> anyhow::Result<()> {
143143
self.validate()?;
144+
migration::run_file_migration(&CONFIG).await?;
145+
CONFIG.validate_storage().await?;
146+
migration::run_metadata_migration(&CONFIG).await?;
147+
let metadata = storage::resolve_parseable_metadata().await?;
148+
banner::print(&CONFIG, &metadata).await;
149+
rbac::map::init(&metadata);
150+
metadata.set_global();
144151
self.initialize().await
145152
}
146153

@@ -405,13 +412,6 @@ impl Server {
405412
}
406413

407414
async fn initialize(&self) -> anyhow::Result<()> {
408-
migration::meta_file_migration(&CONFIG).await?;
409-
migration::run_metadata_migration(&CONFIG).await?;
410-
let metadata = storage::resolve_parseable_metadata().await?;
411-
banner::print(&CONFIG, &metadata).await;
412-
rbac::map::init(&metadata);
413-
metadata.set_global();
414-
415415
if let Some(cache_manager) = LocalCacheManager::global() {
416416
cache_manager
417417
.validate(CONFIG.parseable.local_cache_size)

server/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;
5656
#[actix_web::main]
5757
async fn main() -> anyhow::Result<()> {
5858
env_logger::init();
59-
CONFIG.validate_storage().await?;
6059

6160
// these are empty ptrs so mem footprint should be minimal
6261
let server: Arc<dyn ParseableServer> = match CONFIG.parseable.mode {

server/src/migration.rs

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@ mod metadata_migration;
2121
mod schema_migration;
2222
mod stream_metadata_migration;
2323

24-
use std::fs::OpenOptions;
24+
use std::{fs::OpenOptions, sync::Arc};
2525

2626
use bytes::Bytes;
27+
use itertools::Itertools;
2728
use relative_path::RelativePathBuf;
2829
use serde::Serialize;
2930

3031
use crate::{
3132
option::Config,
3233
storage::{
33-
object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME
34+
object_storage::{parseable_json_path, stream_json_path},
35+
ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
36+
SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY,
3437
},
3538
};
3639

@@ -120,7 +123,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
120123
.put_object(&path, to_bytes(&new_stream_metadata))
121124
.await?;
122125

123-
let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]);
126+
let schema_path =
127+
RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
124128
let schema = storage.get_object(&schema_path).await?;
125129
let schema = serde_json::from_slice(&schema).ok();
126130
let map = schema_migration::v1_v3(schema)?;
@@ -132,7 +136,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
132136
.put_object(&path, to_bytes(&new_stream_metadata))
133137
.await?;
134138

135-
let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]);
139+
let schema_path =
140+
RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
136141
let schema = storage.get_object(&schema_path).await?;
137142
let schema = serde_json::from_slice(&schema)?;
138143
let map = schema_migration::v2_v3(schema)?;
@@ -204,7 +209,7 @@ pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> an
204209
Ok(())
205210
}
206211

207-
pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
212+
pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> {
208213
let object_store = config.storage().get_object_store();
209214

210215
let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
@@ -217,6 +222,16 @@ pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
217222
return Err(err.into());
218223
}
219224

225+
run_meta_file_migration(&object_store, old_meta_file_path).await?;
226+
run_stream_files_migration(object_store).await?;
227+
228+
Ok(())
229+
}
230+
231+
async fn run_meta_file_migration(
232+
object_store: &Arc<dyn ObjectStorage + Send>,
233+
old_meta_file_path: RelativePathBuf,
234+
) -> anyhow::Result<()> {
220235
log::info!("Migrating metadata files to new location");
221236

222237
// get the list of all meta files
@@ -227,10 +242,13 @@ pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
227242
match object_store.get_object(&file).await {
228243
Ok(bytes) => {
229244
// we can unwrap here because we know the file exists
230-
let new_path = RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, file.file_name().unwrap()]);
245+
let new_path = RelativePathBuf::from_iter([
246+
PARSEABLE_ROOT_DIRECTORY,
247+
file.file_name().unwrap(),
248+
]);
231249
object_store.put_object(&new_path, bytes).await?;
232250
object_store.delete_object(&file).await?;
233-
},
251+
}
234252
Err(err) => {
235253
// if error is not a no such key error, something weird happened
236254
// so return the error
@@ -242,4 +260,40 @@ pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
242260
}
243261

244262
Ok(())
245-
}
263+
}
264+
265+
async fn run_stream_files_migration(
266+
object_store: Arc<dyn ObjectStorage + Send>,
267+
) -> anyhow::Result<()> {
268+
let streams = object_store
269+
.list_old_streams()
270+
.await?
271+
.into_iter()
272+
.map(|stream| stream.name)
273+
.collect_vec();
274+
275+
for stream in streams {
276+
let paths = object_store.get_stream_file_paths(&stream).await?;
277+
278+
for path in paths {
279+
match object_store.get_object(&path).await {
280+
Ok(bytes) => {
281+
let new_path = RelativePathBuf::from_iter([
282+
stream.as_str(),
283+
STREAM_ROOT_DIRECTORY,
284+
path.file_name().unwrap(),
285+
]);
286+
object_store.put_object(&new_path, bytes).await?;
287+
object_store.delete_object(&path).await?;
288+
}
289+
Err(err) => {
290+
if !matches!(err, ObjectStorageError::NoSuchKey(_)) {
291+
return Err(err.into());
292+
}
293+
}
294+
}
295+
}
296+
}
297+
298+
Ok(())
299+
}

server/src/option.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::path::PathBuf;
2626
use std::sync::Arc;
2727

2828
use crate::cli::Cli;
29-
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
29+
use crate::storage::object_storage::parseable_json_path;
3030
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
3131
pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB
3232
pub const JOIN_COMMUNITY: &str =
@@ -102,7 +102,7 @@ impl Config {
102102
// if the proper data directory is provided, or s3 bucket is provided etc
103103
pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> {
104104
let obj_store = self.storage.get_object_store();
105-
let rel_path = relative_path::RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
105+
let rel_path = parseable_json_path();
106106

107107
let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok();
108108

server/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use self::staging::StorageDir;
4343
// metadata file names in a Stream prefix
4444
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
4545
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
46+
pub const STREAM_ROOT_DIRECTORY: &str = ".stream";
4647
pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
4748
pub const SCHEMA_FILE_NAME: &str = ".schema";
4849
pub const ALERT_FILE_NAME: &str = ".alert.json";

server/src/storage/localfs.rs

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::option::validation;
3636

3737
use super::{
3838
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
39-
STREAM_METADATA_FILE_NAME,
39+
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
4040
};
4141

4242
#[derive(Debug, Clone, clap::Args)]
@@ -134,7 +134,7 @@ impl ObjectStorage for LocalFS {
134134
if flag {
135135
path_arr.push(
136136
RelativePathBuf::from_path(entry.path().file_name().unwrap())
137-
.map_err(|err| ObjectStorageError::Custom(err.to_string()))?,
137+
.map_err(ObjectStorageError::PathError)?,
138138
);
139139
}
140140
}
@@ -147,6 +147,48 @@ impl ObjectStorage for LocalFS {
147147
Ok(path_arr)
148148
}
149149

150+
async fn get_stream_file_paths(
151+
&self,
152+
stream_name: &str,
153+
) -> Result<Vec<RelativePathBuf>, ObjectStorageError> {
154+
let time = Instant::now();
155+
let mut path_arr = vec![];
156+
157+
// = data/stream_name
158+
let stream_dir_path = self.path_in_root(&RelativePathBuf::from(stream_name));
159+
let mut entries = fs::read_dir(&stream_dir_path).await?;
160+
161+
while let Some(entry) = entries.next_entry().await? {
162+
let flag = entry
163+
.path()
164+
.file_name()
165+
.unwrap_or_default()
166+
.to_str()
167+
.unwrap_or_default()
168+
.contains("ingester");
169+
170+
if flag {
171+
path_arr.push(RelativePathBuf::from_iter([
172+
stream_name,
173+
entry.path().file_name().unwrap().to_str().unwrap(),
174+
]));
175+
}
176+
}
177+
178+
path_arr.push(RelativePathBuf::from_iter([
179+
stream_name,
180+
STREAM_METADATA_FILE_NAME,
181+
]));
182+
path_arr.push(RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]));
183+
184+
let time = time.elapsed().as_secs_f64();
185+
REQUEST_RESPONSE_TIME
186+
.with_label_values(&["GET", "200"]) // this might not be the right status code
187+
.observe(time);
188+
189+
Ok(path_arr)
190+
}
191+
150192
async fn get_objects(
151193
&self,
152194
base_path: Option<&RelativePath>,
@@ -253,6 +295,26 @@ impl ObjectStorage for LocalFS {
253295
Ok(logstreams)
254296
}
255297

298+
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
299+
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY];
300+
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
301+
let entries: Vec<DirEntry> = directories.try_collect().await?;
302+
let entries = entries
303+
.into_iter()
304+
.map(|entry| dir_with_old_stream(entry, ignore_dir));
305+
306+
let logstream_dirs: Vec<Option<String>> =
307+
FuturesUnordered::from_iter(entries).try_collect().await?;
308+
309+
let logstreams = logstream_dirs
310+
.into_iter()
311+
.flatten()
312+
.map(|name| LogStream { name })
313+
.collect();
314+
315+
Ok(logstreams)
316+
}
317+
256318
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError> {
257319
let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?)
258320
.try_collect::<Vec<DirEntry>>()
@@ -324,7 +386,7 @@ impl ObjectStorage for LocalFS {
324386
}
325387
}
326388

327-
async fn dir_with_stream(
389+
async fn dir_with_old_stream(
328390
entry: DirEntry,
329391
ignore_dirs: &[&str],
330392
) -> Result<Option<String>, ObjectStorageError> {
@@ -358,6 +420,42 @@ async fn dir_with_stream(
358420
}
359421
}
360422

423+
async fn dir_with_stream(
424+
entry: DirEntry,
425+
ignore_dirs: &[&str],
426+
) -> Result<Option<String>, ObjectStorageError> {
427+
let dir_name = entry
428+
.path()
429+
.file_name()
430+
.expect("valid path")
431+
.to_str()
432+
.expect("valid unicode")
433+
.to_owned();
434+
435+
if ignore_dirs.contains(&dir_name.as_str()) {
436+
return Ok(None);
437+
}
438+
439+
if entry.file_type().await?.is_dir() {
440+
let path = entry.path();
441+
442+
// even in ingest mode, we should only look for the global stream metadata file
443+
let stream_json_path = path
444+
.join(STREAM_ROOT_DIRECTORY)
445+
.join(STREAM_METADATA_FILE_NAME);
446+
447+
if stream_json_path.exists() {
448+
Ok(Some(dir_name))
449+
} else {
450+
let err: Box<dyn std::error::Error + Send + Sync + 'static> =
451+
format!("found {}", entry.path().display()).into();
452+
Err(ObjectStorageError::UnhandledError(err))
453+
}
454+
} else {
455+
Ok(None)
456+
}
457+
}
458+
361459
async fn dir_name(entry: DirEntry) -> Result<Option<String>, ObjectStorageError> {
362460
if entry.file_type().await?.is_dir() {
363461
let dir_name = entry

0 commit comments

Comments
 (0)