-
-
Notifications
You must be signed in to change notification settings - Fork 153
Refactor StorageSync #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
nitisht
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some comments
|
|
||
| pub fn list_streams(&self) -> Vec<String> { | ||
| self.read().unwrap().keys().map(String::clone).collect() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not add unwrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrapping here is fine because same thread does not hold any lock. Some measures can be considered to handle poisoning across all instances where read is needed.
| impl Opt { | ||
| pub fn get_cache_path(&self, stream_name: &str) -> String { | ||
| format!("{}/{}", self.local_disk_path, stream_name) | ||
| pub fn get_cache_path(&self, stream_name: &str) -> PathBuf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and below function return the exact same thing. Let's remove one of these.
| } | ||
| let filename = file.file_name().unwrap().to_str().unwrap(); | ||
| let file_suffix = str::replacen(filename, ".", "/", 3); | ||
| let s3_path = format!("{}/{}", stream, file_suffix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use the join approach everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s3 path is key for s3 put object. This does not represents any local file but is generated from the filename of the files which are yet to be synced. It is fine for it to be a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
| ); | ||
| } | ||
| let filename = file.file_name().unwrap().to_str().unwrap(); | ||
| let file_suffix = str::replacen(filename, ".", "/", 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets avoid adding new unwraps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filename is a valid utf-8 filename generated when local sync happens. Only way this could fail is if someone created an invalid file in the tmp directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Someone could try this, we can't underestimate our users :)
This adds multiple improvements - Datafusion querying multiple prefixes - Avoid dependency on external plugins for S3 connection. - Ensure using complete file path inside each prefix (added in #55) to avoid listing calls Co-authored-by: Satyam Singh <[email protected]>
Description
This PR refactors code around
StorageSync, fixes instances wherePathbufis more appropriate type to for path handling and changes filename such that it adds hostname for identification.StorageSync is only ever required for local sync cycle. On s3 sync currently it checks for top level folder inside local data directory to retrieve stream names and respective paths, this is not ideal so this is changed so that we only go through streams that are returned through 'list_streams' in s3 storage. This partiallty fixes #54 but there should be more checks in place when loading streams.
Changes
This PR has: