Skip to content

Commit c318794

Browse files
add Prism mode
1 parent bf3db35 commit c318794

File tree

8 files changed

+34
-36
lines changed

8 files changed

+34
-36
lines changed

src/catalog/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,10 @@ pub async fn remove_manifest_from_snapshot(
343343
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
344344
}
345345
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
346-
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
346+
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
347347
std::io::Error::new(
348348
std::io::ErrorKind::Unsupported,
349-
"Can't remove manifest from within Index server",
349+
"Can't remove manifest from within Index or Prism server",
350350
),
351351
))),
352352
}
@@ -359,7 +359,7 @@ pub async fn get_first_event(
359359
) -> Result<Option<String>, ObjectStorageError> {
360360
let mut first_event_at: String = String::default();
361361
match PARSEABLE.options.mode {
362-
Mode::Index => unimplemented!(),
362+
Mode::Index | Mode::Prism => unimplemented!(),
363363
Mode::All | Mode::Ingest => {
364364
// get current snapshot
365365
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();

src/handlers/http/middleware.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ where
358358
})
359359
}
360360

361-
Mode::Index => {
361+
Mode::Index | Mode::Prism => {
362362
let fut = self.service.call(req);
363363

364364
Box::pin(async move {

src/handlers/http/modal/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,7 @@ impl NodeType {
230230

231231
impl fmt::Display for NodeType {
232232
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233-
match self {
234-
NodeType::Ingestor => write!(f, "ingestor"),
235-
NodeType::Indexer => write!(f, "indexer"),
236-
NodeType::Querier => write!(f, "querier"),
237-
}
233+
write!(f, "{}", self.as_str())
238234
}
239235
}
240236

src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ async fn main() -> anyhow::Result<()> {
4343
println!("Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!");
4444
exit(0)
4545
}
46+
Mode::Prism => {
47+
println!("Prism is an enterprise feature. Check out https://www.parseable.com/pricing to know more!");
48+
exit(0)
49+
}
4650
Mode::All => Box::new(Server),
4751
};
4852

src/option.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub enum Mode {
2323
Query,
2424
Ingest,
2525
Index,
26+
Prism,
2627
#[default]
2728
All,
2829
}

src/parseable/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ impl Parseable {
282282
Mode::Query => "Distributed (Query)",
283283
Mode::Ingest => "Distributed (Ingest)",
284284
Mode::Index => "Distributed (Index)",
285+
Mode::Prism => "Distributed (Prism)",
285286
Mode::All => "Standalone",
286287
}
287288
}

src/storage/object_storage.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -895,40 +895,36 @@ pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
895895
}
896896

897897
pub fn schema_path(stream_name: &str) -> RelativePathBuf {
898-
match &PARSEABLE.options.mode {
899-
Mode::Ingest => {
900-
let id = PARSEABLE
901-
.ingestor_metadata
902-
.as_ref()
903-
.expect(INGESTOR_EXPECT)
904-
.get_node_id();
905-
let file_name = format!(".ingestor.{id}{SCHEMA_FILE_NAME}");
906-
907-
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
908-
}
909-
Mode::All | Mode::Query | Mode::Index => {
910-
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME])
911-
}
898+
if PARSEABLE.options.mode == Mode::Ingest {
899+
let id = PARSEABLE
900+
.ingestor_metadata
901+
.as_ref()
902+
.expect(INGESTOR_EXPECT)
903+
.get_node_id();
904+
let file_name = format!(".ingestor.{id}{SCHEMA_FILE_NAME}");
905+
906+
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
907+
} else {
908+
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME])
912909
}
913910
}
914911

915912
#[inline(always)]
916913
pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
917-
match &PARSEABLE.options.mode {
918-
Mode::Ingest => {
919-
let id = PARSEABLE
920-
.ingestor_metadata
921-
.as_ref()
922-
.expect(INGESTOR_EXPECT)
923-
.get_node_id();
924-
let file_name = format!(".ingestor.{id}{STREAM_METADATA_FILE_NAME}",);
925-
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
926-
}
927-
Mode::Query | Mode::All | Mode::Index => RelativePathBuf::from_iter([
914+
if PARSEABLE.options.mode == Mode::Ingest {
915+
let id = PARSEABLE
916+
.ingestor_metadata
917+
.as_ref()
918+
.expect(INGESTOR_EXPECT)
919+
.get_node_id();
920+
let file_name = format!(".ingestor.{id}{STREAM_METADATA_FILE_NAME}",);
921+
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
922+
} else {
923+
RelativePathBuf::from_iter([
928924
stream_name,
929925
STREAM_ROOT_DIRECTORY,
930926
STREAM_METADATA_FILE_NAME,
931-
]),
927+
])
932928
}
933929
}
934930

src/storage/store_metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ pub async fn resolve_parseable_metadata(
159159
metadata.server_mode = PARSEABLE.options.mode;
160160
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
161161
},
162-
Mode::Index => {
162+
Mode::Index | Mode::Prism => {
163163
// if index server is started fetch the metadata from remote
164164
// update the server mode for local metadata
165165
metadata.server_mode = PARSEABLE.options.mode;

0 commit comments

Comments
 (0)