Skip to content

Commit 3ea414e

Browse files
committed
Introduced the Index mode for Parseable server
1 parent 48bd263 commit 3ea414e

File tree

7 files changed

+38
-2
lines changed

7 files changed

+38
-2
lines changed

src/catalog/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ pub async fn remove_manifest_from_snapshot(
340340
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
341341
}
342342
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
343+
Mode::Index => unimplemented!()
343344
}
344345
}
345346

@@ -350,6 +351,7 @@ pub async fn get_first_event(
350351
) -> Result<Option<String>, ObjectStorageError> {
351352
let mut first_event_at: String = String::default();
352353
match PARSEABLE.options.mode {
354+
Mode::Index => unimplemented!(),
353355
Mode::All | Mode::Ingest => {
354356
// get current snapshot
355357
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();

src/handlers/http/middleware.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,25 @@ where
357357
Ok(res)
358358
})
359359
}
360+
361+
Mode::Index => {
362+
let accessable_endpoints = ["create", "delete"];
363+
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
364+
if !cond {
365+
Box::pin(async {
366+
Err(actix_web::error::ErrorUnauthorized(
367+
"Only Index API can be accessed in Index Mode",
368+
))
369+
})
370+
} else {
371+
let fut = self.service.call(req);
372+
373+
Box::pin(async move {
374+
let res = fut.await?;
375+
Ok(res)
376+
})
377+
}
378+
}
360379
}
361380
}
362381
}

src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::process::exit;
2+
13
/*
24
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
35
*
@@ -37,6 +39,10 @@ async fn main() -> anyhow::Result<()> {
3739
let server: Box<dyn ParseableServer> = match &PARSEABLE.options.mode {
3840
Mode::Query => Box::new(QueryServer),
3941
Mode::Ingest => Box::new(IngestServer),
42+
Mode::Index => {
43+
println!("Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!");
44+
exit(0)
45+
},
4046
Mode::All => Box::new(Server),
4147
};
4248

src/option.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222
pub enum Mode {
2323
Query,
2424
Ingest,
25+
Index,
2526
#[default]
2627
All,
2728
}
@@ -128,6 +129,7 @@ pub mod validation {
128129
"query" => Ok(Mode::Query),
129130
"ingest" => Ok(Mode::Ingest),
130131
"all" => Ok(Mode::All),
132+
"index" => Ok(Mode::Index),
131133
_ => Err("Invalid MODE provided".to_string()),
132134
}
133135
}

src/parseable/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ impl Parseable {
243243
match self.options.mode {
244244
Mode::Query => "Distributed (Query)",
245245
Mode::Ingest => "Distributed (Ingest)",
246+
Mode::Index => "Distributed (Index)",
246247
Mode::All => "Standalone",
247248
}
248249
}

src/storage/object_storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ pub fn schema_path(stream_name: &str) -> RelativePathBuf {
820820

821821
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
822822
}
823-
Mode::All | Mode::Query => {
823+
Mode::All | Mode::Query | Mode::Index => {
824824
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME])
825825
}
826826
}
@@ -838,7 +838,7 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
838838
let file_name = format!(".ingestor.{id}{STREAM_METADATA_FILE_NAME}",);
839839
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
840840
}
841-
Mode::Query | Mode::All => RelativePathBuf::from_iter([
841+
Mode::Query | Mode::All | Mode::Index => RelativePathBuf::from_iter([
842842
stream_name,
843843
STREAM_ROOT_DIRECTORY,
844844
STREAM_METADATA_FILE_NAME,

src/storage/store_metadata.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ pub async fn resolve_parseable_metadata(
159159
metadata.server_mode = PARSEABLE.options.mode;
160160
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
161161
},
162+
Mode::Index => {
163+
// if index server is started fetch the metadata from remote
164+
// update the server mode for local metadata
165+
metadata.server_mode = PARSEABLE.options.mode;
166+
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
167+
}
162168
}
163169
Ok(metadata)
164170
}

0 commit comments

Comments
 (0)