Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"

[build-dependencies]
cargo_toml = "0.15"
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ use crate::localcache::LocalCacheManager;
async fn main() -> anyhow::Result<()> {
env_logger::init();
let storage = CONFIG.storage().get_object_store();
CONFIG.validate_staging()?;
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
CONFIG.validate_staging()?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
metadata.set_global();
Expand Down
36 changes: 22 additions & 14 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use clap::{command, value_parser, Arg, ArgGroup, Args, Command, FromArgMatches};

use once_cell::sync::Lazy;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use std::path::{Path, PathBuf};
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use url::Url;

Expand All @@ -43,15 +44,14 @@ pub struct Config {
impl Config {
fn new() -> Self {
let cli = parseable_cli_command().get_matches();

match cli.subcommand() {
Some(("local-store", m)) => {
let server = match Server::from_arg_matches(m) {
Ok(server) => server,
Err(err) => err.exit(),
};
let storage = match FSConfig::from_arg_matches(m) {
Ok(server) => server,
Ok(storage) => storage,
Err(err) => err.exit(),
};

Expand Down Expand Up @@ -85,7 +85,7 @@ impl Config {
Err(err) => err.exit(),
};
let storage = match S3Config::from_arg_matches(m) {
Ok(server) => server,
Ok(storage) => storage,
Err(err) => err.exit(),
};

Expand All @@ -108,7 +108,7 @@ impl Config {
self.storage.clone()
}

pub fn staging_dir(&self) -> &Path {
pub fn staging_dir(&self) -> &PathBuf {
&self.parseable.local_staging_path
}

Expand Down Expand Up @@ -611,12 +611,14 @@ impl From<Compression> for parquet::basic::Compression {

pub mod validation {
use std::{
fs::{canonicalize, create_dir_all},
env, io,
net::ToSocketAddrs,
path::PathBuf,
path::{Path, PathBuf},
str::FromStr,
};

use path_clean::PathClean;

use crate::option::MIN_CACHE_SIZE_BYTES;
use crate::storage::LOCAL_SYNC_INTERVAL;
use human_size::{multiples, SpecificSize};
Expand All @@ -634,16 +636,22 @@ pub mod validation {

Ok(path)
}
pub fn absolute_path(path: impl AsRef<Path>) -> io::Result<PathBuf> {
let path = path.as_ref();

let absolute_path = if path.is_absolute() {
path.to_path_buf()
} else {
env::current_dir()?.join(path)
}
.clean();

Ok(absolute_path)
}

pub fn canonicalize_path(s: &str) -> Result<PathBuf, String> {
let path = PathBuf::from(s);

create_dir_all(&path)
.map_err(|err| err.to_string())
.and_then(|_| {
canonicalize(&path)
.map_err(|_| "Cannot use the path provided as an absolute path".to_string())
})
Ok(absolute_path(path).unwrap())
}

pub fn socket_addr(s: &str) -> Result<String, String> {
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl ObjectStorage for LocalFS {
};
let to_path = self.root.join(key);
if let Some(path) = to_path.parent() {
fs::create_dir_all(path).await?
fs::create_dir_all(path).await?;
}
let _ = fs_extra::file::copy(path, to_path, &op)?;
Ok(())
Expand Down
13 changes: 5 additions & 8 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl StorageMetadata {
Self {
version: "v3".to_string(),
mode: CONFIG.storage_name.to_owned(),
staging: CONFIG.staging_dir().canonicalize().unwrap(),
staging: CONFIG.staging_dir().to_path_buf(),
storage: CONFIG.storage().get_endpoint(),
deployment_id: uid::gen(),
users: Vec::new(),
Expand Down Expand Up @@ -104,7 +104,7 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
if staging.deployment_id == remote.deployment_id {
EnvChange::None(remote)
} else {
EnvChange::DeploymentMismatch
EnvChange::NewRemote
}
}
(None, Some(remote)) => EnvChange::NewStaging(remote),
Expand All @@ -116,16 +116,14 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
let mut overwrite_staging = false;
let mut overwrite_remote = false;

const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage";
let res = match check {
EnvChange::None(metadata) => {
// overwrite staging anyways so that it matches remote in case of any divergence
overwrite_staging = true;
Ok(metadata)
}
EnvChange::DeploymentMismatch => Err(MISMATCH),
},
EnvChange::NewRemote => {
Err("Could not start the server because metadata not found in storage")
Err("Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server")
}
EnvChange::NewStaging(mut metadata) => {
create_dir_all(CONFIG.staging_dir())?;
Expand Down Expand Up @@ -171,9 +169,8 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EnvChange {
/// No change in env i.e both staging and remote have same id
/// or deployment id of staging is not matching with that of remote
None(StorageMetadata),
/// Mismatch in deployment id. Cannot use this staging for this remote
DeploymentMismatch,
/// Metadata not found in storage. Treated as possible misconfiguration on user side.
NewRemote,
/// If a new staging is found then we just copy remote metadata to this staging.
Expand Down