Skip to content

Conversation

@bhalchandrap
Copy link

Description of PR

I work for Pinterest. I developed a technique for vastly improving read throughput when reading from the S3 file system. It not only helps the sequential read case (like reading a SequenceFile) but also significantly improves read throughput of a random access case (like reading Parquet). This technique has been very useful in significantly improving efficiency of the data processing jobs at Pinterest.

I would like to contribute that feature to Apache Hadoop. More details on this technique are available in this blog I wrote recently:
https://medium.com/pinterest-engineering/improving-efficiency-and-reducing-runtime-using-s3-read-optimization-b31da4b60fa0

How was this patch tested?

Tested against S3 in us-east-1 region.

There are a small number of test failures (most within s3guard). I can attach test output if it helps.

@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@apache apache deleted a comment from hadoop-yetus Dec 3, 2021
@steveloughran steveloughran self-assigned this Dec 3, 2021
Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've done the first review.

  1. this review was done using dictation tools and single-handed typing. If a sentence doesn't make sense, it will be for that reason
  2. And I have looked more at integration than the actual functionality

now the code is being moved into the hadoop codebase, it has to move to the existing classes in production and test code. This is to keep maintenance down in future.

It also needs to maintain the same statistics, auditing, etc. as the current stream, including CanUnbuffer, StreamCapabilities, IOStatisticsSource, change tracking. Sorry.

key changes will be

  • use an Invoker around all calls to S3.
  • use a callback to indirectly interact with the AWS S3 client, similar to S3AInputStream.InputStreamCallbacks. This is to support testing, maintenance and to allow us to annotate every request with audit headers.
  • collect detail statistics on what is happening and provide these through the IOStatisticsSource API in the stream; updating the file system in close()
  • integrate with the openFile() API. That's not just a "would be nice" feature, it is a blocker as LineRecordReader uses that API calls to open files. If it is not wired up you would not actually get prefetch on jobs which use that input source

#2584 add some standard options openFile();
you can also add some custom ones to make the stream parameters tunable on an invocation by invocation basis.
That PR includes read strategies such as sequential and whole-file. These could be used to select specific
cache options (none) as well as block sizes for the next reads.

I also think everything in org.apache.hadoop.fs.common should be moved into the hadoop-common module, in the package org.apache.hadoop.fs.store.
But I also worry about the importance of Twitter classes. Does this add a new dependency?

checkArgument(Files.isRegularFile(path), "Path %s (%s) must point to a file.", argName, path);
}

public static void checkArgument(boolean expression, String format, Object... args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have our own version of guava preconditions in org.apache.hadoop.util; please use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use guava (like) functionality when I am already using Apache Validate?


protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
createCopyFromLocalCallbacks() throws IOException {
createCopyFromLocalCallbacks() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omit these

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I did not understand the suggestion. can you please clarify?

if (this.prefetchEnabled) {
return this.createPrefetchingInputStream(f, bufferSize);
} else {
return open(f, Optional.empty(), Optional.empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must go in the open/3 method below, so that openFile() will also use it.
You will also get the benefits of the filestatus retrieved during the existence checks, with all length and etag; openFile() allows callers to pass it in so we can skip the HEAD request.

/**
* Provides an {@link InputStream} that allows reading from an S3 file.
*/
public abstract class S3InputStream extends InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything which goes wrong here MUST raise an IOexception.
The invoker coming in on the S3AReadOpContext (which should be passed down in the constructor) will do this
When you use it: declare the retry policy through the @retries annotation. See S3AInputStream for the details.

String key,
long contentLength,
AmazonS3 client,
FileSystem.Statistics stats) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expect to be constructed with a reference to a org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics instance. a stub one can be supplied for testing.
The production one will update the file system statistics as appropriate.

@apache apache deleted a comment from hadoop-yetus Dec 17, 2021
@apache apache deleted a comment from hadoop-yetus Dec 17, 2021
@apache apache deleted a comment from hadoop-yetus Dec 17, 2021
Copy link
Contributor

@mehakmeet mehakmeet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good. Did an initial review, still going over it.

this.throwIfClosed();
this.throwIfInvalidSeek(pos);

if (!this.getFilePosition().setAbsolute(pos)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments in this section to explain what's happening here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}

// @VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@yzhangal yzhangal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI Kumar, thanks for the good work here. I'm submitting a partial review while I will be doing more.

* This implementation provides improved read throughput by asynchronously prefetching
* blocks of configurable size from the underlying S3 file.
*/
public class S3EInputStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the good work here Kumar.

Suggest to change S3EInputStream to S3AInputStreamWithPrefetch, say, it also log S3AStatistics, it's implemented in S3AFileSystem, it's better to have a signatur e S3A in its name, rather than S3E, and it would be good to stick to the categorization described in https://web.archive.org/web/20170718025436/https://aws.amazon. com/premiumsupport/knowledge-center/emr-file-system-s3/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to evaluate and describe the number of S3 accesses implication of this change so users can be aware of. See https://issues.apache.org/jira/browse/HADOOP-14965?focusedCommentId=17489161&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17489161 for reference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed for #1. I will rename to a suitable name to avoid E in the name.

as for 2, the impact is negligible. here is a back of the envelope calculation:
for a 1TB dataset and 8MB block size (default), we will make 125000 accesses. That translates to about $0.05 increase in job cost (assuming list prices from AWS : $0.0004 per 1000 requests). The small increase is more than offset by the reduction in runtime by orders of magnitude.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree that saving time is generally the best action to prioritise, as if you are running in ec2 your cpu rental is a bigger cost. it does increase the risk of overloading the IO capacity of a shard in a bucket, but we have that with the current random IO implementation anyway

private int prefetchBlockSize;

// Size of prefetch queue (in number of blocks).
private int prefetchBlockCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to change prefetchBlockCount to prefetchThreadCount.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep name as - is because the number of prefetch threads is independent of number of prefetch blocks.

// Call read and then unbuffer
FSDataInputStream stream = fs.open(path);
assertEquals(0, stream.read(new byte[8])); // mocks read 0 bytes
assertEquals(-1, stream.read(new byte[8])); // mocks read 0 bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment to explain why return value is -1 instead of 0 here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the correct behavior. see line 62 & 63. read of underlying objectstream returns -1 which should be reflected in input stream read.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

private String getIntList(Iterable<Integer> nums) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add javadoc to describe what the method does

private String getIntList(Iterable<Integer> nums) {
List<String> numList = new ArrayList<>();
List<Integer> numbers = new ArrayList<Integer>();
for (Integer n : nums) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nums.forEach(numbers::add) can be used to replace the loop.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it could be though I will keep it as is for consistency with the rest of the code (which does not use streams)

Collections.sort(numbers);

int index = 0;
while (index < numbers.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in this loop is not straightforward, suggest to add a comment what it does

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be clear based on the function header I added.

/**
* Holds information about blocks of data in a file.
*/
public class BlockData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add more detailed description of the values, for example, NOT_READY, what's the meaning of ready? the meaning of queued, what's the diff between ready and cached?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added more details

public class BlockOperations {
private static final Logger LOG = LoggerFactory.getLogger(BlockOperations.class);

public enum Kind {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a clear description what each of these operations does. Though some seems to be intuitive, others not.

And it would be good to describe the lifecycle of a block data somewhere.

Thanks

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is for debugging/logging purposes only. It is not a part of the public interface. I added explanation in the class header.

@steveloughran
Copy link
Contributor

I'm not deliberately ignoring you, just so overloaded with my own work that I've not been able to review anything.

we've put mukund's vectored IO patch into a feature branch with the idea being we will still do the normal review process before every patch, but we can get that short chain of patches lined up before the big merge into trunk. we can also play rebase and interactive rebase too.

would that work for you too? so patch 1 is "take what there is", patch 2+ being the changes we want in before shipping

the risk is always it gets forgotten, so we still need to push hard to get it into a state where it can be used as an option, the goal being "no side effects if not used", including nothing extra on the classpath.

meanwhile, have a look at this #2584

its a big patch, but a key feature is you can declare your read policy, with whole-file being an option as well as random, sequential and vectored.

distcp and the CLI tools all declare their read plans this way

i'd like this in before both the vectored io and your stream, so you can use it to help decide whether to cache etc, and to support custom options as well as parse the newly defined standard ones

@bhalchandrap
Copy link
Author

bhalchandrap commented Feb 18, 2022 via email

@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@apache apache deleted a comment from hadoop-yetus Mar 25, 2022
@steveloughran
Copy link
Contributor

I've created a feature branch for this and a PR which takes this change (with test file renames & some other small changes) as an initial PR to apply to it: #4109

can everyone who has outstanding issues/comments look at that patch and create some subtasks on the JIRA itself...we can fix the issues (test failures etc) in individual subtasks.

note: this feature branch can be rebased as we go along, though I'd like to get it in asap. I do want to get #2584 in first, which needs reviews from other people. That change should allow for the new input stream to decide policy from the openFile() parameters

@steveloughran
Copy link
Contributor

patch is merged in to the feature branch; closing this to avoid confusion.

thank you for a wonderful contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants