Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::{S3Config, S3};
pub use store_metadata::StorageMetadata;

use self::store_metadata::{put_staging_metadata, startup_check, EnvChange};
use self::store_metadata::{put_staging_metadata, EnvChange};

/// local sync interval to move data.records to /tmp dir of that stream.
/// 60 sec is a reasonable value.
Expand Down Expand Up @@ -124,7 +124,15 @@ impl ObjectStoreFormat {
}

pub async fn resolve_parseable_metadata() -> Result<(), ObjectStorageError> {
let check = startup_check().await?;
let staging_metadata = store_metadata::get_staging_metadata()?;
let storage = CONFIG.storage().get_object_store();
let remote_metadata = storage.get_metadata().await?;

let check = store_metadata::check_metadata_conflict(
staging_metadata.as_ref(),
remote_metadata.as_ref(),
);

const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage";
let err: Option<&str> = match check {
EnvChange::None => None,
Expand All @@ -134,11 +142,15 @@ pub async fn resolve_parseable_metadata() -> Result<(), ObjectStorageError> {
Some("Could not start the server because metadata not found in storage")
}
EnvChange::NewStaging => {
Some("Could not start the server becuase metadata not found in staging")
put_staging_metadata(remote_metadata.as_ref().expect("remote metadata exists"))?;

// allow new staging directories
return Ok(());
}
EnvChange::CreateBoth => {
create_staging_metadata()?;
create_remote_metadata().await?;
let metadata = StorageMetadata::new();
create_remote_metadata(&metadata).await?;
create_staging_metadata(&metadata)?;
None
}
};
Expand All @@ -156,15 +168,14 @@ pub async fn resolve_parseable_metadata() -> Result<(), ObjectStorageError> {
}
}

async fn create_remote_metadata() -> Result<(), ObjectStorageError> {
async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> {
let client = CONFIG.storage().get_object_store();
client.put_metadata(&StorageMetadata::new()).await
client.put_metadata(metadata).await
}

fn create_staging_metadata() -> std::io::Result<()> {
fn create_staging_metadata(metadata: &StorageMetadata) -> std::io::Result<()> {
create_dir_all(CONFIG.staging_dir())?;
let metadata = StorageMetadata::new();
put_staging_metadata(&metadata)
put_staging_metadata(metadata)
}

lazy_static! {
Expand Down
18 changes: 5 additions & 13 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::io;

use crate::{option::CONFIG, utils::uid};

use super::{object_storage::PARSEABLE_METADATA_FILE_NAME, ObjectStorageError};
use super::object_storage::PARSEABLE_METADATA_FILE_NAME;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StorageMetadata {
Expand Down Expand Up @@ -61,17 +61,9 @@ impl StorageMetadata {
}
}

pub async fn startup_check() -> Result<EnvChange, ObjectStorageError> {
let staging_metadata = get_staging_metadata()?;
let storage = CONFIG.storage().get_object_store();
let remote_metadata = storage.get_metadata().await?;

Ok(check_metadata_conflict(staging_metadata, remote_metadata))
}

fn check_metadata_conflict(
staging_metadata: Option<StorageMetadata>,
remote_metadata: Option<StorageMetadata>,
pub fn check_metadata_conflict(
staging_metadata: Option<&StorageMetadata>,
remote_metadata: Option<&StorageMetadata>,
) -> EnvChange {
match (staging_metadata, remote_metadata) {
(Some(staging), Some(remote)) if staging.mode == remote.mode => {
Expand Down Expand Up @@ -100,7 +92,7 @@ pub enum EnvChange {
CreateBoth,
}

fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let bytes = match fs::read(path) {
Ok(bytes) => bytes,
Expand Down