Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,29 +190,33 @@ impl Server {

// get the filters web scope
pub fn get_filters_webscope() -> Scope {
web::scope("/filters").service(
web::scope("/{user_id}")
.service(
web::resource("")
.route(web::get().to(filters::list).authorize(Action::ListFilter)),
)
.service(
web::scope("/{filter_id}").service(
web::resource("")
.route(web::get().to(filters::get).authorize(Action::GetFilter))
.route(
web::post()
.to(filters::post)
.authorize(Action::CreateFilter),
)
.route(
web::delete()
.to(filters::delete)
.authorize(Action::DeleteFilter),
),
),
web::scope("/filters")
.service(
web::resource("").route(
web::post()
.to(filters::post)
.authorize(Action::CreateFilter),
),
)
)
.service(
web::scope("/filter").service(
web::resource("/{filter_id}")
.route(web::get().to(filters::get).authorize(Action::GetFilter))
.route(
web::delete()
.to(filters::delete)
.authorize(Action::DeleteFilter),
)
.route(
web::put()
.to(filters::update)
.authorize(Action::CreateFilter),
),
),
)
.service(web::scope("/{user_id}").service(
web::resource("").route(web::get().to(filters::list).authorize(Action::ListFilter)),
))
}

// get the query factory
Expand Down
130 changes: 49 additions & 81 deletions server/src/handlers/http/users/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ use crate::{
handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY},
option::CONFIG,
storage::{object_storage::filter_path, ObjectStorageError},
users::filters::{Filter, FILTERS},
users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS},
};
use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use http::StatusCode;
use serde_json::{Error as SerdeError, Value as JsonValue};
use rand::distributions::DistString;
use serde_json::Error as SerdeError;

pub async fn list(req: HttpRequest) -> Result<impl Responder, FiltersError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;

let stream_name = req
.headers()
.iter()
Expand All @@ -41,117 +41,85 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, FiltersError> {
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;
let filters = FILTERS.list_filters_by_user_and_stream(user_id, stream_name);

// .users/user_id/filters/stream_name/filter_id
let path = filter_path(user_id, stream_name, "");

let store = CONFIG.storage().get_object_store();
let filters = store
.get_objects(
Some(&path),
Box::new(|file_name: String| file_name.ends_with("json")),
)
.await?;

let mut filt = vec![];
for filter in filters {
filt.push(serde_json::from_slice::<JsonValue>(&filter)?)
}

Ok((web::Json(filt), StatusCode::OK))
Ok((web::Json(filters), StatusCode::OK))
}

pub async fn get(req: HttpRequest) -> Result<impl Responder, FiltersError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;

let filt_id = req
let filter_id = req
.match_info()
.get("filter_id")
.ok_or(FiltersError::Metadata("No Filter Id Provided"))?;

let stream_name = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
.ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))?
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;

if let Some(filter) = FILTERS.find(filt_id) {
if let Some(filter) = FILTERS.get_filter(filter_id) {
return Ok((web::Json(filter), StatusCode::OK));
}

// if it is not in memory go to s3
let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
let resource = CONFIG
.storage()
.get_object_store()
.get_object(&path)
.await?;
Err(FiltersError::Metadata("Filter Not Found"))
}

let resource = serde_json::from_slice::<Filter>(&resource)?;
pub async fn post(body: Bytes) -> Result<HttpResponse, PostError> {
let filter: Filter = serde_json::from_slice(&body)?;
let filter_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 10);
let user_id = &filter.user_id;
let stream_name = &filter.stream_name;
let mut cloned_filter = filter.clone();
cloned_filter.filter_id = Some(filter_id.clone());
cloned_filter.version = Some(CURRENT_FILTER_VERSION.to_string());
FILTERS.update(&cloned_filter);

Ok((web::Json(resource), StatusCode::OK))
}
let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id));

pub async fn post(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;
let store = CONFIG.storage().get_object_store();
let filter_bytes = serde_json::to_vec(&cloned_filter)?;
store.put_object(&path, Bytes::from(filter_bytes)).await?;

let filt_id = req
Ok(HttpResponse::Ok().finish())
}

pub async fn update(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let filter_id = req
.match_info()
.get("filter_id")
.ok_or(FiltersError::Metadata("No Filter Id Provided"))?;
let filter = FILTERS
.get_filter(filter_id)
.ok_or(FiltersError::Metadata("Filter Not Found"))?;
let user_id = &filter.user_id;
let stream_name = &filter.stream_name;

let stream_name = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
.ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))?
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;
let mut cloned_filter: Filter = serde_json::from_slice(&body)?;
cloned_filter.filter_id = Some(filter_id.to_string());
cloned_filter.version = Some(CURRENT_FILTER_VERSION.to_string());
FILTERS.update(&cloned_filter);

let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
let filter: Filter = serde_json::from_slice(&body)?;
FILTERS.update(filter);
let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id));

let store = CONFIG.storage().get_object_store();
store.put_object(&path, body).await?;
let filter_bytes = serde_json::to_vec(&cloned_filter)?;
store.put_object(&path, Bytes::from(filter_bytes)).await?;

Ok(HttpResponse::Ok().finish())
}

pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;

let filt_id = req
let filter_id = req
.match_info()
.get("filter_id")
.ok_or(FiltersError::Metadata("No Filter Id Provided"))?;
let filter = FILTERS
.get_filter(filter_id)
.ok_or(FiltersError::Metadata("Filter Not Found"))?;
let stream_name = &filter.stream_name;
let user_id = &filter.user_id;

let stream_name = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
.ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))?
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;

let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id));
let store = CONFIG.storage().get_object_store();
store.delete_object(&path).await?;

FILTERS.delete_filter(filter_id);

Ok(HttpResponse::Ok().finish())
}

Expand All @@ -161,7 +129,7 @@ pub enum FiltersError {
ObjectStorage(#[from] ObjectStorageError),
#[error("Serde Error: {0}")]
Serde(#[from] SerdeError),
#[error("Cannot perform this operation: {0}")]
#[error("Operation cannot be performed: {0}")]
Metadata(&'static str),
}

Expand Down
31 changes: 31 additions & 0 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,37 @@ impl ObjectStorage for LocalFS {
Ok(dirs)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
let users_root_path = self.root.join(USERS_ROOT_DIR);
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
let users: Vec<DirEntry> = directories.try_collect().await?;
for user in users {
if !user.path().is_dir() {
continue;
}
let stream_root_path = users_root_path.join(user.path()).join("filters");
let directories = ReadDirStream::new(fs::read_dir(&stream_root_path).await?);
let streams: Vec<DirEntry> = directories.try_collect().await?;
for stream in streams {
if !stream.path().is_dir() {
continue;
}
let filters_path = users_root_path
.join(user.path())
.join("filters")
.join(stream.path());
let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?);
let filters_files: Vec<DirEntry> = directories.try_collect().await?;
for filter in filters_files {
let file = fs::read(filter.path()).await?;
filters.push(file.into());
}
}
}
Ok(filters)
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
let path = self.root.join(stream_name);
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
Expand Down
1 change: 1 addition & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
Expand Down
44 changes: 44 additions & 0 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,50 @@ impl ObjectStorage for S3 {
.collect::<Vec<_>>())
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
.list_with_delimiter(Some(&users_root_path))
.await?;

let users = resp
.common_prefixes
.iter()
.flat_map(|path| path.parts())
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
.map(|name| name.as_ref().to_string())
.collect::<Vec<_>>();
for user in users {
let user_filters_path = object_store::path::Path::from(format!(
"{}/{}/{}",
USERS_ROOT_DIR, user, "filters"
));
let resp = self
.client
.list_with_delimiter(Some(&user_filters_path))
.await?;
let streams = resp
.common_prefixes
.iter()
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
.map(|name| name.as_ref().to_string())
.collect::<Vec<_>>();
for stream in streams {
let filters_path = RelativePathBuf::from(&stream);
let filter_bytes = self
.get_objects(
Some(&filters_path),
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters.extend(filter_bytes);
}
}
Ok(filters)
}

fn get_bucket_name(&self) -> String {
self.bucket.clone()
}
Expand Down
Loading