Skip to content

Commit 44fec77

Browse files
authored
Merge branch 'main' into add-stream-creation-time
2 parents adbae20 + b026b40 commit 44fec77

File tree

10 files changed

+79
-42
lines changed

10 files changed

+79
-42
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
<div align="center">
1212

1313
[![Docker Pulls](https://img.shields.io/docker/pulls/parseable/parseable?logo=docker&label=Docker%20Pulls)](https://hub.docker.com/r/parseable/parseable)
14-
[![Slack](https://img.shields.io/badge/slack-brightgreen.svg?logo=slack&label=Community&style=flat&color=%2373DC8C&)](https://launchpass.com/parseable)
15-
[![Docs](https://img.shields.io/badge/stable%20docs-parseable.io%2Fdocs-brightgreen?style=flat&color=%2373DC8C&label=Docs)](https://www.parseable.io/docs)
14+
[![Slack](https://img.shields.io/badge/slack-brightgreen.svg?logo=slack&label=Community&style=flat&color=%2373DC8C&)](https://logg.ing/community)
15+
[![Docs](https://img.shields.io/badge/stable%20docs-parseable.io%2Fdocs-brightgreen?style=flat&color=%2373DC8C&label=Docs)](https://logg.ing/docs)
1616
[![Build](https://img.shields.io/github/checks-status/parseablehq/parseable/main?style=flat&color=%2373DC8C&label=Checks)](https://github.com/parseablehq/parseable/actions)
1717

1818
[Key Concepts](https://www.parseable.io/docs/concepts) | [Features](https://github.com/parseablehq/parseable#rocket-highlights) | [Documentation](https://www.parseable.io/docs) | [Demo](https://demo.parseable.com/login?q=eyJ1c2VybmFtZSI6ImFkbWluIiwicGFzc3dvcmQiOiJhZG1pbiJ9) | [Integrations](https://www.parseable.io/docs/category/integrations) | [FAQ](https://www.parseable.io/docs/faq)

server/src/about.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub fn print_about(
104104
eprintln!(
105105
"
106106
Commit: \"{commit_hash}\"
107-
Docs: \"https://www.parseable.io/docs\""
107+
Docs: \"https://logg.ing/docs\""
108108
);
109109
}
110110

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ use crate::localcache::LocalCacheManager;
5555
async fn main() -> anyhow::Result<()> {
5656
env_logger::init();
5757
let storage = CONFIG.storage().get_object_store();
58+
CONFIG.validate().await?;
5859
migration::run_metadata_migration(&CONFIG).await?;
5960
let metadata = storage::resolve_parseable_metadata().await?;
60-
CONFIG.validate_staging()?;
6161
banner::print(&CONFIG, &metadata).await;
6262
rbac::map::init(&metadata);
6363
metadata.set_global();

server/src/migration.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use crate::{
3232
storage::{ObjectStorage, ObjectStorageError},
3333
};
3434

35+
/// Migrate the metdata from v1 or v2 to v3
36+
/// This is a one time migration
3537
pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
3638
let object_store = config.storage().get_object_store();
3739
let storage_metadata = get_storage_metadata(&*object_store).await?;

server/src/option.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ use std::sync::Arc;
2727
use url::Url;
2828

2929
use crate::oidc::{self, OpenidConfig};
30-
use crate::storage::{FSConfig, ObjectStorageProvider, S3Config};
31-
use crate::utils::validate_path_is_writeable;
30+
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
3231

3332
pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB
34-
33+
pub const JOIN_COMMUNITY: &str =
34+
"Join us on Parseable Slack community for questions : https://logg.ing/community";
3535
pub static CONFIG: Lazy<Arc<Config>> = Lazy::new(|| Arc::new(Config::new()));
3636

3737
#[derive(Debug)]
@@ -99,9 +99,31 @@ impl Config {
9999
}
100100
}
101101

102-
pub fn validate_staging(&self) -> anyhow::Result<()> {
103-
let staging_path = self.staging_dir();
104-
validate_path_is_writeable(staging_path)
102+
pub async fn validate(&self) -> Result<(), ObjectStorageError> {
103+
let obj_store = self.storage.get_object_store();
104+
let rel_path = relative_path::RelativePathBuf::from(".parseable.json");
105+
106+
let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok();
107+
108+
// Lists all the directories in the root of the bucket/directory
109+
// can be a stream (if it contains .stream.json file) or not
110+
let has_dirs = match obj_store.list_dirs().await {
111+
Ok(dirs) => !dirs.is_empty(),
112+
Err(_) => false,
113+
};
114+
115+
let has_streams = obj_store.list_streams().await.is_ok();
116+
117+
if has_streams || !has_dirs && !has_parseable_json {
118+
return Ok(());
119+
}
120+
121+
if self.mode_string() == "Local drive" {
122+
return Err(ObjectStorageError::Custom(format!("Could not start the server because directory '{}' contains stale data, please use an empty directory, and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY)));
123+
}
124+
125+
// S3 bucket mode
126+
Err(ObjectStorageError::Custom(format!("Could not start the server because bucket '{}' contains stale data, please use an empty bucket and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY)))
105127
}
106128

107129
pub fn storage(&self) -> Arc<dyn ObjectStorageProvider + Send + Sync> {
@@ -159,7 +181,7 @@ fn parseable_cli_command() -> Command {
159181
.next_line_help(false)
160182
.help_template(
161183
r#"
162-
{about} Join the community at https://launchpass.com/parseable.
184+
{about} Join the community at https://logg.ing/community.
163185
164186
{all-args}
165187
"#,

server/src/storage/localfs.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tokio::fs::{self, DirEntry};
3232
use tokio_stream::wrappers::ReadDirStream;
3333

3434
use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
35-
use crate::{option::validation, utils::validate_path_is_writeable};
35+
use crate::option::validation;
3636

3737
use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
3838

@@ -139,8 +139,8 @@ impl ObjectStorage for LocalFS {
139139
}
140140

141141
async fn check(&self) -> Result<(), ObjectStorageError> {
142-
fs::create_dir_all(&self.root).await?;
143-
validate_path_is_writeable(&self.root)
142+
fs::create_dir_all(&self.root)
143+
.await
144144
.map_err(|e| ObjectStorageError::UnhandledError(e.into()))
145145
}
146146

@@ -169,6 +169,23 @@ impl ObjectStorage for LocalFS {
169169
Ok(logstreams)
170170
}
171171

172+
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError> {
173+
let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?)
174+
.try_collect::<Vec<DirEntry>>()
175+
.await?
176+
.into_iter()
177+
.map(dir_name);
178+
179+
let dirs = FuturesUnordered::from_iter(dirs)
180+
.try_collect::<Vec<_>>()
181+
.await?
182+
.into_iter()
183+
.flatten()
184+
.collect::<Vec<_>>();
185+
186+
Ok(dirs)
187+
}
188+
172189
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
173190
let path = self.root.join(stream_name);
174191
let directories = ReadDirStream::new(fs::read_dir(&path).await?);

server/src/storage/object_storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub trait ObjectStorage: Sync + 'static {
7575
async fn check(&self) -> Result<(), ObjectStorageError>;
7676
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
7777
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
78+
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
7879
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
7980
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
8081

server/src/storage/s3.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,18 @@ impl ObjectStorage for S3 {
470470
fn store_url(&self) -> url::Url {
471471
url::Url::parse(&format!("s3://{}", self.bucket)).unwrap()
472472
}
473+
474+
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError> {
475+
let pre = object_store::path::Path::from("/");
476+
let resp = self.client.list_with_delimiter(Some(&pre)).await?;
477+
478+
Ok(resp
479+
.common_prefixes
480+
.iter()
481+
.flat_map(|path| path.parts())
482+
.map(|name| name.as_ref().to_string())
483+
.collect::<Vec<_>>())
484+
}
473485
}
474486

475487
impl From<object_store::Error> for ObjectStorageError {

server/src/storage/store_metadata.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use once_cell::sync::OnceCell;
2626
use std::io;
2727

2828
use crate::{
29-
option::CONFIG,
29+
option::{CONFIG, JOIN_COMMUNITY},
3030
rbac::{role::model::DefaultPrivilege, user::User},
3131
storage::ObjectStorageError,
3232
utils::uid,
@@ -92,8 +92,8 @@ impl StorageMetadata {
9292
}
9393
}
9494

95-
// always returns remote metadata as it is source of truth
96-
// overwrites staging metadata while updating storage info
95+
/// always returns remote metadata as it is source of truth
96+
/// overwrites staging metadata while updating storage info
9797
pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStorageError> {
9898
let staging_metadata = get_staging_metadata()?;
9999
let storage = CONFIG.storage().get_object_store();
@@ -145,11 +145,7 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
145145
};
146146

147147
let metadata = res.map_err(|err| {
148-
let err = format!(
149-
"{}. {}",
150-
err,
151-
"Join us on Parseable Slack to report this incident : https://launchpass.com/parseable"
152-
);
148+
let err = format!("{}. {}", err, JOIN_COMMUNITY);
153149
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.into();
154150
ObjectStorageError::UnhandledError(err)
155151
})?;
@@ -168,7 +164,7 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
168164
// variant contain remote metadata
169165
#[derive(Debug, Clone, PartialEq, Eq)]
170166
pub enum EnvChange {
171-
/// No change in env i.e both staging and remote have same id
167+
/// No change in env i.e both staging and remote have same id
172168
/// or deployment id of staging is not matching with that of remote
173169
None(StorageMetadata),
174170
/// Metadata not found in storage. Treated as possible misconfiguration on user side.

server/src/utils.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ pub mod json;
2323
pub mod uid;
2424
pub mod update;
2525

26-
use std::path::Path;
27-
2826
use chrono::{DateTime, NaiveDate, Timelike, Utc};
2927

3028
#[allow(dead_code)]
@@ -43,17 +41,6 @@ pub fn capitalize_ascii(s: &str) -> String {
4341
s[0..1].to_uppercase() + &s[1..]
4442
}
4543

46-
pub fn validate_path_is_writeable(path: &Path) -> anyhow::Result<()> {
47-
let Ok(md) = std::fs::metadata(path) else {
48-
anyhow::bail!("Could not read metadata for staging dir")
49-
};
50-
let permissions = md.permissions();
51-
if permissions.readonly() {
52-
anyhow::bail!("Staging directory {} is not writable", path.display())
53-
}
54-
Ok(())
55-
}
56-
5744
/// Convert minutes to a slot range
5845
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
5946
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
@@ -263,7 +250,7 @@ mod tests {
263250
]
264251
)]
265252
#[case::same_hour_with_00_to_59_minute_block(
266-
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
253+
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
267254
&["date=2022-06-11/hour=16/"]
268255
)]
269256
#[case::same_date_different_hours_coherent_minute(
@@ -274,37 +261,37 @@ mod tests {
274261
]
275262
)]
276263
#[case::same_date_different_hours_incoherent_minutes(
277-
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
264+
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
278265
&[
279266
"date=2022-06-11/hour=15/minute=59/",
280267
"date=2022-06-11/hour=16/minute=00/"
281268
]
282269
)]
283270
#[case::same_date_different_hours_whole_hours_between_incoherent_minutes(
284-
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
271+
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
285272
&[
286273
"date=2022-06-11/hour=15/minute=59/",
287274
"date=2022-06-11/hour=16/",
288275
"date=2022-06-11/hour=17/minute=00/"
289276
]
290277
)]
291278
#[case::different_date_coherent_hours_and_minutes(
292-
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
279+
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
293280
&[
294281
"date=2022-06-11/",
295282
"date=2022-06-12/"
296283
]
297284
)]
298285
#[case::different_date_incoherent_hours_coherent_minutes(
299-
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
286+
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
300287
&[
301288
"date=2022-06-11/hour=23/",
302289
"date=2022-06-12/hour=00/",
303290
"date=2022-06-12/hour=01/"
304291
]
305292
)]
306293
#[case::different_date_incoherent_hours_incoherent_minutes(
307-
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
294+
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
308295
&[
309296
"date=2022-06-11/hour=23/minute=59/",
310297
"date=2022-06-12/hour=00/minute=00/"

0 commit comments

Comments
 (0)