Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use crate::storage::ObjectStorageError;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use actix_web::http::header;
use actix_web::{HttpRequest, Responder};
use http::StatusCode;
Expand Down Expand Up @@ -338,7 +339,7 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
let store = CONFIG.storage().get_object_store();

let root_path = RelativePathBuf::from("");
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(Some(&root_path))
.await?
Expand Down
36 changes: 15 additions & 21 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use crate::metrics;
use crate::rbac;
use crate::rbac::role::Action;
use crate::storage;
use crate::storage::object_storage::ingester_metadata_path;
use crate::storage::object_storage::parseable_json_path;
use crate::storage::ObjectStorageError;
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
use crate::sync;

use super::server::Server;
Expand Down Expand Up @@ -101,6 +102,17 @@ impl ParseableServer for IngestServer {
/// implement the init method will just invoke the initialize method
async fn init(&self) -> anyhow::Result<()> {
self.validate()?;
// check for querier state. Is it there, or was it there in the past
self.check_querier_state().await?;
// to get the .parseable.json file in staging
self.validate_credentials().await?;

let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
// set the info in the global metadata
metadata.set_global();

self.initialize().await
}

Expand Down Expand Up @@ -180,13 +192,8 @@ impl IngestServer {
async fn set_ingester_metadata(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();

// remove ip adn go with the domain name
let sock = Server::get_server_address();
let path = RelativePathBuf::from(format!(
"ingester.{}.{}.json",
sock.ip(), // this might be wrong
sock.port()
));
let path = ingester_metadata_path(sock.ip().to_string(), sock.port().to_string());

if store.get_object(&path).await.is_ok() {
println!("Ingester metadata already exists");
Expand Down Expand Up @@ -228,7 +235,7 @@ impl IngestServer {
// i.e the querier will create the `.parseable.json` file

let store = CONFIG.storage().get_object_store();
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
let path = parseable_json_path();

match store.get_object(&path).await {
Ok(_) => Ok(()),
Expand Down Expand Up @@ -271,19 +278,6 @@ impl IngestServer {
}

async fn initialize(&self) -> anyhow::Result<()> {
// check for querier state. Is it there, or was it there in the past
self.check_querier_state().await?;
// to get the .parseable.json file in staging
self.validate_credentials().await?;

let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;

rbac::map::init(&metadata);

// set the info in the global metadata
metadata.set_global();

if let Some(cache_manager) = LocalCacheManager::global() {
cache_manager
.validate(CONFIG.parseable.local_cache_size)
Expand Down
22 changes: 9 additions & 13 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ impl ParseableServer for QueryServer {
/// implementation of init should just invoke a call to initialize
async fn init(&self) -> anyhow::Result<()> {
self.validate()?;
migration::run_file_migration(&CONFIG).await?;
CONFIG.validate_storage().await?;
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;
// initialize the rbac map
rbac::map::init(&metadata);
// keep metadata info in mem
metadata.set_global();
self.initialize().await
}

Expand Down Expand Up @@ -165,18 +174,6 @@ impl QueryServer {

/// initialize the server, run migrations as needed and start the server
async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;

let metadata = storage::resolve_parseable_metadata().await?;

banner::print(&CONFIG, &metadata).await;

// initialize the rbac map
rbac::map::init(&metadata);

// keep metadata info in mem
metadata.set_global();

let prometheus = metrics::build_metrics_handler();
CONFIG.storage().register_store_metrics(&prometheus);

Expand All @@ -189,7 +186,6 @@ impl QueryServer {

// track all parquet files already in the data directory
storage::retention::load_retention_from_global();

// load data from stats back to prometheus metrics
metrics::fetch_stats_from_storage().await;

Expand Down
13 changes: 7 additions & 6 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ impl ParseableServer for Server {
/// implementation of init should just invoke a call to initialize
async fn init(&self) -> anyhow::Result<()> {
self.validate()?;
migration::run_file_migration(&CONFIG).await?;
CONFIG.validate_storage().await?;
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
metadata.set_global();
self.initialize().await
}

Expand Down Expand Up @@ -405,12 +412,6 @@ impl Server {
}

async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
metadata.set_global();

if let Some(cache_manager) = LocalCacheManager::global() {
cache_manager
.validate(CONFIG.parseable.local_cache_size)
Expand Down
1 change: 0 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
CONFIG.validate_storage().await?;

// these are empty ptrs so mem footprint should be minimal
let server: Arc<dyn ParseableServer> = match CONFIG.parseable.mode {
Expand Down
113 changes: 104 additions & 9 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ mod metadata_migration;
mod schema_migration;
mod stream_metadata_migration;

use std::fs::OpenOptions;
use std::{fs::OpenOptions, sync::Arc};

use bytes::Bytes;
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::Serialize;

use crate::{
option::Config,
storage::{
object_storage::stream_json_path, ObjectStorage, ObjectStorageError,
PARSEABLE_METADATA_FILE_NAME, SCHEMA_FILE_NAME,
object_storage::{parseable_json_path, stream_json_path},
ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY,
},
};

Expand Down Expand Up @@ -121,7 +123,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
.put_object(&path, to_bytes(&new_stream_metadata))
.await?;

let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]);
let schema_path =
RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
let schema = storage.get_object(&schema_path).await?;
let schema = serde_json::from_slice(&schema).ok();
let map = schema_migration::v1_v3(schema)?;
Expand All @@ -133,7 +136,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
.put_object(&path, to_bytes(&new_stream_metadata))
.await?;

let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]);
let schema_path =
RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
let schema = storage.get_object(&schema_path).await?;
let schema = serde_json::from_slice(&schema)?;
let map = schema_migration::v2_v3(schema)?;
Expand All @@ -153,7 +157,8 @@ fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
}

pub fn get_staging_metadata(config: &Config) -> anyhow::Result<Option<serde_json::Value>> {
let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let path = parseable_json_path().to_path(config.staging_dir());

let bytes = match std::fs::read(path) {
Ok(bytes) => bytes,
Err(err) => match err.kind() {
Expand All @@ -162,13 +167,14 @@ pub fn get_staging_metadata(config: &Config) -> anyhow::Result<Option<serde_json
},
};
let meta: serde_json::Value = serde_json::from_slice(&bytes).unwrap();

Ok(Some(meta))
}

async fn get_storage_metadata(
storage: &dyn ObjectStorage,
) -> anyhow::Result<Option<serde_json::Value>> {
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
let path = parseable_json_path();
match storage.get_object(&path).await {
Ok(bytes) => Ok(Some(
serde_json::from_slice(&bytes).expect("parseable config is valid json"),
Expand All @@ -187,13 +193,13 @@ pub async fn put_remote_metadata(
storage: &dyn ObjectStorage,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
let path = parseable_json_path();
let metadata = serde_json::to_vec(metadata)?.into();
Ok(storage.put_object(&path, metadata).await?)
}

pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> {
let path = config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let path = parseable_json_path().to_path(config.staging_dir());
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
Expand All @@ -202,3 +208,92 @@ pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> an
serde_json::to_writer(&mut file, metadata)?;
Ok(())
}

pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> {
let object_store = config.storage().get_object_store();

let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);

// if this errors that means migrations is already done
if let Err(err) = object_store.get_object(&old_meta_file_path).await {
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
return Ok(());
}
return Err(err.into());
}

run_meta_file_migration(&object_store, old_meta_file_path).await?;
run_stream_files_migration(object_store).await?;

Ok(())
}

async fn run_meta_file_migration(
object_store: &Arc<dyn ObjectStorage + Send>,
old_meta_file_path: RelativePathBuf,
) -> anyhow::Result<()> {
log::info!("Migrating metadata files to new location");

// get the list of all meta files
let mut meta_files = object_store.get_ingester_meta_file_paths().await?;
meta_files.push(old_meta_file_path);

for file in meta_files {
match object_store.get_object(&file).await {
Ok(bytes) => {
// we can unwrap here because we know the file exists
let new_path = RelativePathBuf::from_iter([
PARSEABLE_ROOT_DIRECTORY,
file.file_name().unwrap(),
]);
object_store.put_object(&new_path, bytes).await?;
object_store.delete_object(&file).await?;
}
Err(err) => {
// if error is not a no such key error, something weird happened
// so return the error
if !matches!(err, ObjectStorageError::NoSuchKey(_)) {
return Err(err.into());
}
}
}
}

Ok(())
}

async fn run_stream_files_migration(
object_store: Arc<dyn ObjectStorage + Send>,
) -> anyhow::Result<()> {
let streams = object_store
.list_old_streams()
.await?
.into_iter()
.map(|stream| stream.name)
.collect_vec();

for stream in streams {
let paths = object_store.get_stream_file_paths(&stream).await?;

for path in paths {
match object_store.get_object(&path).await {
Ok(bytes) => {
let new_path = RelativePathBuf::from_iter([
stream.as_str(),
STREAM_ROOT_DIRECTORY,
path.file_name().unwrap(),
]);
object_store.put_object(&new_path, bytes).await?;
object_store.delete_object(&path).await?;
}
Err(err) => {
if !matches!(err, ObjectStorageError::NoSuchKey(_)) {
return Err(err.into());
}
}
}
}
}

Ok(())
}
4 changes: 2 additions & 2 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use crate::cli::Cli;
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
use crate::storage::object_storage::parseable_json_path;
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB
pub const JOIN_COMMUNITY: &str =
Expand Down Expand Up @@ -102,7 +102,7 @@ impl Config {
// if the proper data directory is provided, or s3 bucket is provided etc
pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> {
let obj_store = self.storage.get_object_store();
let rel_path = relative_path::RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
let rel_path = parseable_json_path();

let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok();

Expand Down
2 changes: 2 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub use self::staging::StorageDir;
// metadata file names in a Stream prefix
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
pub const STREAM_ROOT_DIRECTORY: &str = ".stream";
pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
pub const SCHEMA_FILE_NAME: &str = ".schema";
pub const ALERT_FILE_NAME: &str = ".alert.json";
pub const MANIFEST_FILE: &str = "manifest.json";
Expand Down
Loading