Skip to content

Commit 79321a4

Browse files
committed
Introduced the Index mode for Parseable server
1 parent b776c68 commit 79321a4

File tree

10 files changed

+64
-8
lines changed

10 files changed

+64
-8
lines changed

src/catalog/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ pub async fn remove_manifest_from_snapshot(
340340
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
341341
}
342342
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
343+
Mode::Index => unimplemented!(),
343344
}
344345
}
345346

@@ -350,6 +351,7 @@ pub async fn get_first_event(
350351
) -> Result<Option<String>, ObjectStorageError> {
351352
let mut first_event_at: String = String::default();
352353
match PARSEABLE.options.mode {
354+
Mode::Index => unimplemented!(),
353355
Mode::All | Mode::Ingest => {
354356
// get current snapshot
355357
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();

src/handlers/http/middleware.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,25 @@ where
357357
Ok(res)
358358
})
359359
}
360+
361+
Mode::Index => {
362+
let accessable_endpoints = ["create", "delete"];
363+
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
364+
if !cond {
365+
Box::pin(async {
366+
Err(actix_web::error::ErrorUnauthorized(
367+
"Only Index API can be accessed in Index Mode",
368+
))
369+
})
370+
} else {
371+
let fut = self.service.call(req);
372+
373+
Box::pin(async move {
374+
let res = fut.await?;
375+
Ok(res)
376+
})
377+
}
378+
}
360379
}
361380
}
362381
}

src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::process::exit;
2+
13
/*
24
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
35
*
@@ -37,6 +39,10 @@ async fn main() -> anyhow::Result<()> {
3739
let server: Box<dyn ParseableServer> = match &PARSEABLE.options.mode {
3840
Mode::Query => Box::new(QueryServer),
3941
Mode::Ingest => Box::new(IngestServer),
42+
Mode::Index => {
43+
println!("Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!");
44+
exit(0)
45+
}
4046
Mode::All => Box::new(Server),
4147
};
4248

src/option.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222
pub enum Mode {
2323
Query,
2424
Ingest,
25+
Index,
2526
#[default]
2627
All,
2728
}
@@ -128,6 +129,7 @@ pub mod validation {
128129
"query" => Ok(Mode::Query),
129130
"ingest" => Ok(Mode::Ingest),
130131
"all" => Ok(Mode::All),
132+
"index" => Ok(Mode::Index),
131133
_ => Err("Invalid MODE provided".to_string()),
132134
}
133135
}

src/parseable/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ impl Parseable {
243243
match self.options.mode {
244244
Mode::Query => "Distributed (Query)",
245245
Mode::Ingest => "Distributed (Ingest)",
246+
Mode::Index => "Distributed (Index)",
246247
Mode::All => "Standalone",
247248
}
248249
}

src/storage/azure_blob.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,10 +428,20 @@ impl ObjectStorage for BlobStore {
428428
&self,
429429
_path: &RelativePath,
430430
) -> Result<BufReader, ObjectStorageError> {
431-
unimplemented!()
431+
Err(ObjectStorageError::UnhandledError(Box::new(
432+
std::io::Error::new(
433+
std::io::ErrorKind::Unsupported,
434+
"Buffered reader not implemented for Blob Storage yet",
435+
),
436+
)))
432437
}
433438
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
434-
unimplemented!()
439+
Err(ObjectStorageError::UnhandledError(Box::new(
440+
std::io::Error::new(
441+
std::io::ErrorKind::Unsupported,
442+
"Head operation not implemented for Blob Storage yet",
443+
),
444+
)))
435445
}
436446

437447
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {

src/storage/localfs.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,20 @@ impl ObjectStorage for LocalFS {
108108
&self,
109109
_path: &RelativePath,
110110
) -> Result<BufReader, ObjectStorageError> {
111-
unimplemented!()
111+
Err(ObjectStorageError::UnhandledError(Box::new(
112+
std::io::Error::new(
113+
std::io::ErrorKind::Unsupported,
114+
"Buffered reader not implemented for LocalFS yet",
115+
),
116+
)))
112117
}
113118
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
114-
unimplemented!()
119+
Err(ObjectStorageError::UnhandledError(Box::new(
120+
std::io::Error::new(
121+
std::io::ErrorKind::Unsupported,
122+
"Head operation not implemented for LocalFS yet",
123+
),
124+
)))
115125
}
116126
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
117127
let time = Instant::now();

src/storage/object_storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ pub fn schema_path(stream_name: &str) -> RelativePathBuf {
820820

821821
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
822822
}
823-
Mode::All | Mode::Query => {
823+
Mode::All | Mode::Query | Mode::Index => {
824824
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME])
825825
}
826826
}
@@ -838,7 +838,7 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
838838
let file_name = format!(".ingestor.{id}{STREAM_METADATA_FILE_NAME}",);
839839
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
840840
}
841-
Mode::Query | Mode::All => RelativePathBuf::from_iter([
841+
Mode::Query | Mode::All | Mode::Index => RelativePathBuf::from_iter([
842842
stream_name,
843843
STREAM_ROOT_DIRECTORY,
844844
STREAM_METADATA_FILE_NAME,

src/storage/s3.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,14 +560,14 @@ impl ObjectStorage for S3 {
560560
path: &RelativePath,
561561
) -> Result<BufReader, ObjectStorageError> {
562562
let path = &to_object_store_path(path);
563-
let meta = self.client.head(path).await.unwrap();
563+
let meta = self.client.head(path).await?;
564564

565565
let store: Arc<dyn ObjectStore> = Arc::new(self.client.clone());
566566
let buf = object_store::buffered::BufReader::new(store, &meta);
567567
Ok(buf)
568568
}
569569
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
570-
Ok(self.client.head(&to_object_store_path(path)).await.unwrap())
570+
Ok(self.client.head(&to_object_store_path(path)).await?)
571571
}
572572

573573
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {

src/storage/store_metadata.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ pub async fn resolve_parseable_metadata(
159159
metadata.server_mode = PARSEABLE.options.mode;
160160
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
161161
},
162+
Mode::Index => {
163+
// if index server is started fetch the metadata from remote
164+
// update the server mode for local metadata
165+
metadata.server_mode = PARSEABLE.options.mode;
166+
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
167+
}
162168
}
163169
Ok(metadata)
164170
}

0 commit comments

Comments
 (0)