-
Notifications
You must be signed in to change notification settings - Fork 343
feat(core): Implement RollingFileWriter to help split data into multiple files
#1547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY for this pr, just finised first round of review.
| if self.should_roll(input_size) { | ||
| if let Some(inner) = self.inner.take() { | ||
| // close the current writer, roll to a new file | ||
| let handle = spawn(async move { inner.close().await }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting optimization, but I would suggest not do it for now. A writer usually consumes some resources like memory, connections etc. Closing them in an async approach make things difficult to understand in production, for example, we may have a lot of unclosed writers which consumes a lot memory and leading to oom.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
I have some rough idea to further improve this: we can use a config to control the maximum parallelism here:
struct RollingWriter {
...
buffer: Vec<DataFileBuilder>
}
...
while close_handles.len() >= self.max_parallelism() {
// wait until some closers complete, and store the data files in a buffer
self.buffer.extend(future::select(self.close_handles))
}
self.close_handles.push(new_handle);
I have not thought very clearly on how to prevent buffer from eating up the memory as of now, or do we even need it?
Either way I agree this can be completed as a follow-up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the close_handles from this PR and created an issue to track this potential optimization: #1551
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory control is a complex topic, and from what I've learned simply add a fix number of in fly doesn't work well when integrated into other systems. I would prefer not to spend too much time on this for now.
|
|
||
| impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> { | ||
| async fn write(&mut self, input: &RecordBatch) -> Result<()> { | ||
| let input_size = input.get_array_memory_size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect, usually parquet files written size is much smaller than arrow's in memory array size since parquet will do a lot of compression. The target_size is not for exact control, so it's fine to write file a little larger that this size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I added some comment to explain this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I mean we should not use this input_size to determine if we should roll, but the writer's current_written_size only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @liurenjie1024 , after testing with the suggested changes, I found an interesting issue:
tldr: the existing ParquetWriter can only get the correct current_written_size when it's closing and flushing data, not when writing data.
This can cause the following case to fail:
let writer: RollingWriter = ...
// should create 1 file
// but won't update current_written_size because we won't close the writer in write()
writer.write(batch1).await?
// if this write should rollover, but since the inner.current_written_size is not updated
// it will try to write the data to the same file as the previous batch
writer.write(batch2).await?
A more detailed analysis:
ParquetWriterusesArrowAsyncWriteras its inner writerArrowAsyncWriterhas async_writer (ArrowRowGroupWriter) and sync_writer (TrackWriterin this case)ArrowAsyncWriter's sync_writer will buffer rows based on the config valuemax_row_group_size(default is 1024 x 1024), causingTrackWriterwon't be able to track the data in the buffer until closing
Basically this issue can happen a lot when the max_row_group_size is large and the target_file_size is small.
To fix this, I think we'll need to change the ParquetWriter's implementation of current_file_size() and use AsyncArrowWrite's in_progress_size to take buffered data into account. But again, in_progress_size is the in-memory size, not the physical size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fix this, I think we'll need to change the ParquetWriter's implementation of current_file_size() and use AsyncArrowWrite's in_progress_size to take buffered data into account. But again, in_progress_size is the in-memory size, not the physical size
This sounds reasonable to me. According to the doc , in_progress_size + bytes_written seems a better estimation of the current file size. Due to the complex encoding of parquet, it's hard to get accurate file size before finishing one row group, so an estimation is good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created an issue and will fix the parquet writer behavior in a separate PR
|
I've manually tested the rolling writer using some small data with the ParquetWriter::current_written_size fix. Now the generated file size is much closer to the configured The difference between the configured
|
Co-authored-by: Renjie Liu <[email protected]>
ed6b0eb to
ac22f27
Compare
|
Wondering what's your plan to make RollingFileWriter partition aware. |
|
Hi @stevie9868 , I hope my reply in a different thread can answer your question |
Thanks @CTTY for the tests. The difference are acceptable to me. |
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY for this pr!
Which issue does this PR close?
RollingFileWriter: Helps split incoming data into multiple files #1541What changes are included in this PR?
RollingFileWriterAre these changes tested?
added unit tests