-
-
Notifications
You must be signed in to change notification settings - Fork 153
Changes required for Parseable Enterprise #1215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces a new public enterprise utility module along with its associated functions for generating time filters and retrieving parquet file paths from an object storage system. It also adjusts module visibility in multiple parts of the codebase, simplifies log formatting in the stream processing module, and adds new asynchronous methods— Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Fetcher as fetch_parquet_file_paths
participant Filter as create_time_filter
participant OS as ObjectStorage
participant Collector as collect_manifest_files
Caller->>Fetcher: Call fetch_parquet_file_paths(stream, time_range)
Fetcher->>Filter: Call create_time_filter(time_range, partition, table)
Filter-->>Fetcher: Return time filter expressions
Fetcher->>OS: Retrieve snapshot and manifest URLs
OS-->>Fetcher: Return matching objects
Fetcher->>Collector: Spawn async tasks to collect manifest files
Collector-->>Fetcher: Return list of manifests
Fetcher->>Caller: Return mapping of parquet paths by date
sequenceDiagram
participant Client
participant Storage as ObjectStorage (S3, LocalFS, AzureBlob)
Client->>Storage: Call get_buffered_reader(path)
Storage-->>Client: Return BufReader (data stream)
Client->>Storage: Call head(path)
Storage-->>Client: Return ObjectMeta (metadata)
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
src/storage/azure_blob.rs (2)
427-432: Unimplementedget_buffered_readermethod
This stub method currently returnsunimplemented!(). Consider implementing a basic version to match the functionality in other storage backends, such as S3. Otherwise, ensure proper error handling or documentation clarifying that it's not yet supported.Do you want me to propose a reference implementation that mirrors the approach in
s3.rs?
433-435: Unimplementedheadmethod
Without a workingheadimplementation, calls to retrieve object metadata will fail on Azure storage. Consider returning aNotImplementederror or implementing the metadata retrieval logic.src/storage/object_storage.rs (1)
79-83: Trait methods for buffered reading and metadata (API consistency)
Addingget_buffered_readerandheadto theObjectStoragetrait enforces consistency across providers. Ensure that all existing implementations (e.g., localfs, azure, s3) follow through with a valid or at least a gracefully failing implementation.src/enterprise/utils.rs (2)
63-135:fetch_parquet_file_pathsfor retrieving and filtering manifest files
This function merges snapshots, fetches manifest files, and applies time-based filters. Consider additional checks:
• Handling partial JSON parse failures gracefully (skip vs. abort).
• Ensuring date path parsing remains robust for different file naming conventions.
137-161: Async fetching of manifests incollect_manifest_files
Spawning tasks for each manifest file concurrently can improve performance. However, ensure partial failures (e.g., a single corrupted file) do not affect overall success. Consider adding timeouts or concurrency limits if the number of files is large.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
src/enterprise/mod.rs(1 hunks)src/enterprise/utils.rs(1 hunks)src/lib.rs(2 hunks)src/parseable/streams.rs(1 hunks)src/query/stream_schema_provider.rs(1 hunks)src/storage/azure_blob.rs(2 hunks)src/storage/localfs.rs(2 hunks)src/storage/object_storage.rs(2 hunks)src/storage/s3.rs(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/enterprise/mod.rs
🔇 Additional comments (10)
src/lib.rs (1)
24-24: Module visibility changes and new enterprise module addition.These changes make several internal modules public and add a new enterprise module. This expands the library's public API surface, which is aligned with the PR objectives for Parseable Enterprise.
Make sure that:
- These modules' public APIs are well-documented
- You've considered API stability implications
- The new enterprise module follows the same design patterns as other modules
Also applies to: 29-29, 41-41, 49-49
src/parseable/streams.rs (1)
516-517: Simplified warning log format.The warning log has been simplified from a multi-line format to a single line, which makes the logging more concise.
src/query/stream_schema_provider.rs (1)
897-897:❓ Verification inconclusive
Made ManifestExt trait public.
This change makes the
ManifestExttrait accessible outside its module, which is consistent with making thequerymodule public inlib.rs. The trait provides useful methods for filtering and pruning data based on query conditions.
🏁 Script executed:
#!/bin/bash # Check if the ManifestExt trait is being used in the new enterprise module rg -A 2 -B 2 "ManifestExt" --include="*.rs" ./src/enterprise/Length of output: 503
ManifestExt Trait Made Public – Further Verification Required
The change to make the
ManifestExttrait public remains valid as it aligns with exposing thequerymodule. However, the verification command intended to check its usage in the enterprise module returned an error due to an unsupported--includeflag. Please manually verify or update the search command (for example, using--glob "*.rs") to confirm that the trait is referenced as expected in thesrc/enterprise/code.src/storage/localfs.rs (1)
31-31: Added new ObjectStore imports for buffered reading and metadata.The import for
BufReaderandObjectMetasupports the new methods being added to theObjectStoragetrait implementation.src/storage/azure_blob.rs (1)
38-41: Potential usage of newly importedBufReader
Thebuffered::BufReaderimport suggests upcoming buffered reading support, but it is not yet referenced in the existing code (beyond the added trait method). Ensure that usage is fully integrated and tested to avoid dead code or missing functionality.src/storage/object_storage.rs (1)
36-37: New imports for trait enhancements
The import ofBufReaderandObjectMetais essential for the added methods (get_buffered_readerandhead). This extension allows for more versatile and standardized object I/O operations in the trait.src/storage/s3.rs (2)
40-43: Updated imports for buffered reading and object metadata retrieval
The addition ofBufReader,BackoffConfig, andObjectMetaimports aligns with new S3 functionalities for metadata and streamed reads. Ensure consistent modern error handling patterns across the code.
333-333: Removal ofLimitStorewrapper
Switching theclientfield to useAmazonS3directly can simplify architecture but may remove concurrency or rate-limiting controls thatLimitStoreprovided. Verify if this is an intentional design choice and whether it affects performance under high load.src/enterprise/utils.rs (2)
1-19: Initial imports for enterprise utilities
Necessary crates likedatafusion,itertools, andrelative_pathare introduced. This setup appears aligned with upcoming filtering and file path generation. Verify that none of these additions significantly increase build overhead or introduce overlapping functionality with existing imports.
20-61:create_time_filterfor generating start/end expressions
The logic handles both partitioned and non-partitioned timestamps. Verify that the inclusive/exclusive bounds (Bound::Includedvs.Bound::Excluded) match the desired time slice. Also consider adding unit tests covering edge cases.
3ea414e to
03687d6
Compare
03687d6 to
79321a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/catalog/mod.rs (1)
354-354:⚠️ Potential issueReplace
unimplemented!()with a proper error handling approachSimilar to the previous comment, using
unimplemented!()will cause a runtime panic if this code path is executed.Replace with a more graceful error handling approach:
- Mode::Index => unimplemented!(), + Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new( + std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Index mode not yet implemented for get_first_event" + ) + ))),
🧹 Nitpick comments (4)
src/main.rs (1)
42-45: Consider adding clarity about program terminationThe implementation correctly informs users that indexing is an enterprise feature, but it might be helpful to explicitly mention that the program will exit after displaying this message.
Additionally, consider whether an exit code of 0 (which indicates successful execution) is the most appropriate choice for this scenario.
src/handlers/http/middleware.rs (1)
361-378: Mode::Index middleware implementation follows established patternsThe implementation correctly restricts access to endpoints except those containing "create" or "delete" when in Index mode, following the same pattern as other mode implementations.
However, you might want to consider documenting what "Index API" specifically refers to, either in a comment or in external documentation, to make it clearer for other developers what functionality is available in this mode.
src/storage/s3.rs (1)
330-330: Consider maintaining a concurrency limit as in the datafusion runtime setup.The
construct_clientmethod now uses a directAmazonS3client rather than theLimitStore<AmazonS3>wrapper. If you wish to preserve the concurrency-limiting behavior from the datafusion runtime in this path, reintroduce a concurrency wrapper or clarify if this new usage is intentionally unlimited.src/enterprise/utils.rs (1)
20-61: Ensure the time bounds align with the intended semantics.Currently, the lower bound is inclusive, and the upper bound is exclusive. Confirm that these match the expected time range filtering semantics.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
src/catalog/mod.rs(2 hunks)src/enterprise/mod.rs(1 hunks)src/enterprise/utils.rs(1 hunks)src/handlers/http/middleware.rs(1 hunks)src/lib.rs(2 hunks)src/main.rs(2 hunks)src/option.rs(2 hunks)src/parseable/mod.rs(1 hunks)src/parseable/streams.rs(1 hunks)src/query/stream_schema_provider.rs(1 hunks)src/storage/azure_blob.rs(2 hunks)src/storage/localfs.rs(2 hunks)src/storage/object_storage.rs(4 hunks)src/storage/s3.rs(3 hunks)src/storage/store_metadata.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- src/enterprise/mod.rs
- src/query/stream_schema_provider.rs
- src/storage/azure_blob.rs
- src/lib.rs
- src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: coverage
🔇 Additional comments (12)
src/storage/store_metadata.rs (1)
162-167: Implementation looks good for Index modeThe implementation for Mode::Index correctly follows the same pattern as Mode::Ingest, updating the server mode and staging directory in the metadata.
src/main.rs (1)
1-2: Added import for process exitImport correctly added for the new exit functionality.
src/option.rs (2)
25-25: Index mode added to Mode enumThe new Mode::Index variant is correctly added to the enum.
132-132: Parsing for "index" string correctly implementedThe mode function is properly updated to handle the new "index" string input.
src/parseable/mod.rs (1)
246-246: LGTM: Mode::Index variant correctly addedThis change adds proper handling for the new
Mode::Indexvariant, which now returns "Distributed (Index)" in the string representation of server modes.src/storage/object_storage.rs (3)
79-83: LGTM: New storage interface methods enhance functionalityThe addition of
get_buffered_readerandheadmethods to theObjectStoragetrait provides more efficient ways to interact with stored objects. Theget_buffered_readerallows streaming data with buffering for better memory usage, while theheadmethod enables checking metadata without downloading the entire object.
823-826: LGTM: Schema path generation updated for Index modeThe schema path generation function has been properly updated to include the new
Mode::Indexvariant in its control flow.
841-846: LGTM: Stream JSON path generation updated for Index modeThe stream JSON path generation function has been properly updated to include the new
Mode::Indexvariant in its control flow.src/storage/localfs.rs (1)
107-125: LGTM: Proper error handling for unimplemented methodsThe implementation of the new
get_buffered_readerandheadmethods correctly returns meaningful error messages instead of causing runtime panics. This approach allows the code to gracefully handle these cases while indicating that the functionality is not yet supported.This implementation follows the best practice suggested in previous code reviews, providing clear error messages that will help developers understand why the operation failed.
src/storage/s3.rs (3)
40-43: Great addition of the BufReader and related imports.These imports enable buffered read capabilities, which is crucial for streaming large objects efficiently. No immediate concerns here.
558-568: Check for potential error states and large object edge cases.The
get_buffered_readermethod gracefully returns an error if theheadcall fails. This is correct. However, for extremely large objects, ensure that partial reads or network timeouts are handled appropriately upstream, especially in high-latency environments.
569-571: Direct pass-through of metadata retrieval looks good.This
headmethod is straightforward and properly propagates errors. No issues found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/enterprise/utils.rs (1)
157-157:⚠️ Potential issueAvoid panics caused by
.expect(...).Deserialization errors in
.expect("Data is invalid for Manifest")will panic if invalid data is encountered. Propagate the error upward or handle it gracefully to improve robustness.Suggested refactor:
.map(|res| { - serde_json::from_slice(&res).expect("Data is invalid for Manifest") + serde_json::from_slice(&res).map_err(|e| { + ObjectStorageError::Generic(format!("Invalid Manifest data: {:?}", e)) + })? })
🧹 Nitpick comments (2)
src/enterprise/utils.rs (2)
20-61: Add unit tests for time filter edge cases.While the create_time_filter function appears conceptually sound, it would be good to have unit tests verifying behavior for boundary conditions (e.g., zero-width time ranges or invalid time partitions). This will improve reliability and reduce the risk of off-by-one or range-related errors.
63-159: Consider adding integration tests for parallel manifest fetching.While the asynchronous fetch approach is straightforward, verifying concurrency performance and error handling in real-world conditions would be beneficial. An integration test or robust mocking could ensure correct handling of partial failures and network timeouts.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/catalog/mod.rs(2 hunks)src/enterprise/utils.rs(1 hunks)src/storage/mod.rs(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/catalog/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
🔇 Additional comments (2)
src/storage/mod.rs (2)
23-23: Import added for new error handling capability.The import for
JoinErrorfromtokio::taskmodule is correctly added to support the new error variant.
259-260: Well-structured error variant for async task handling.The new
JoinErrorvariant in theObjectStorageErrorenum is properly implemented with the#[from]attribute for automatic conversion fromtokio::task::JoinError. This addition enhances the error handling capabilities for async operations in the object storage system, which aligns with the PR's goal of implementing necessary changes for Parseable Enterprise.
| let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | ||
|
|
||
| let date = RelativePathBuf::from_iter(date); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for potential out-of-bounds slice access.
Splitting on "/" and calling date.as_slice()[1..4] can panic if the path has fewer than four segments. Handle or verify the path structure before slicing to prevent a runtime panic.
Possible fix:
- let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
+ if date.len() < 4 {
+ // Decide whether to skip this file or handle the error
+ return Err(
+ ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
+ );
+ }
+ let date = date.as_slice()[1..4].iter().map(|s| s.to_string());📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | |
| let date = RelativePathBuf::from_iter(date); | |
| if date.len() < 4 { | |
| // Decide whether to skip this file or handle the error | |
| return Err( | |
| ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date)) | |
| ); | |
| } | |
| let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | |
| let date = RelativePathBuf::from_iter(date); |
nikhilsinhaparseable
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
utilsmodule for enhanced utilities that support time-based filtering and efficient file retrieval from storage.Indexmode, restricting access to specific endpoints and improving middleware functionality.Indexmode, providing clearer feedback.