Skip to content

Conversation

@mehakmeet
Copy link
Contributor

@mehakmeet mehakmeet commented May 24, 2022

Description of PR

Adding Thread-level IOStatsitics in hadoop-common and implementing it in S3A Streams.

How was this patch tested?

Region: ap-south-1
mvn clean verify -Dparallel-tests -DtestsThreadCount=4 -Dscale

All tests ran fine.

For code changes:

  • Does the title or this PR starts with the corresponding JIRA issue id (e.g. 'HADOOP-17799. Your PR title ...')?
  • Object storage: have the integration tests been executed and the endpoint declared according to the connector-specific documentation?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE, LICENSE-binary, NOTICE-binary files?

@apache apache deleted a comment from hadoop-yetus Jun 9, 2022
Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main comment is that the thread's statistic aggregator should be fetched/stored in constructor, not in close. indeed, it could maybe be passed in. when thread level is disabled, s3a fs would just pass in an EmptyIOStatisticsStore whose aggregation is a noop.

proposed changes to the testing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs indentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space between current and thread

@mehakmeet
Copy link
Contributor Author

Thanks for the review @steveloughran, sorry couldn't address anything until now(got little ill)

main comment is that the thread's statistic aggregator should be fetched/stored in constructor, not in close

Got your point, so just one concern on that, should the IOStatisticsContext be a static instance in the S3AFileSystem, and we just pass on the iostatisticsAggregator to the streams, since we would still require the context in the streams to update the WeakReferenceThreadMap after the aggregation, right?

@steveloughran
Copy link
Contributor

we need a single IOStatisticsContext for all FS instances, so that a task reading from one fs and writing to another would have the stats updated from both actions.

And it needs to be in hadoop-common, just like the common audit context, so that code can compile and link against it even without having hadoop-aws or hadoop-azure on the classpath.

Making the context a weak ref map ensures that GCs will trigger cleanup. This would lose stats, but not while any stream was active, *or if some code picked up a reference to the thread stats before executing work. That is what I plan to do in spark; we will grab that ref before starting the work, and after it is finished, take a snapshot of it. Oh, and reset the values before work starts -we need that context there too.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than the need to move the thread context into hadoop common, everything here looks pretty complete

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@mehakmeet
Copy link
Contributor Author

Had to force push to resolve conflicts.

Some changes in the latest commit:

  • Moved thread context initialization and config for enabling in hadoop-common.
  • Weak ref thread map for iostats context, have a doubt here regarding this whether having context and the iostats as weak ref would cause any issues, since both are now mapped to thread IDs. Thinking in terms of one context per s3afs with iostats per thread.
  • Added new test
  • Disabling iostats returns EmptyIOstatisticsStore instance with no-op aggregation.

@mehakmeet
Copy link
Contributor Author

More merge conflicts due to vectored IO merging.
Interesting bit of info on the new test, seems to fail in maven verify but works fine in intellij, I remember Steve saying something about how tests run differently in mvn than intellij wrt Threads, need to check that.

@apache apache deleted a comment from hadoop-yetus Jun 27, 2022
@apache apache deleted a comment from hadoop-yetus Jun 27, 2022
@apache apache deleted a comment from hadoop-yetus Jun 27, 2022
Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your tests show that we need the ability to reset the statistics for the current threat.

For the tests we wouldn't need to have any class level setup. Instead in the normal set up method we would get the current context stats and reset them. This is also exactly what we will need to do when collecting statistics in the applications where we will want to grab a reference to that context before executing the work, reset it at that point, so that all statistics collected on it at the end of the work will have been added exclusively during that execution.

Proposed: IOStatisticsContext adds resetCurrentThreadStatistics() which does this, calling clear() on the snapshot.

ITestS3AIOStatisticsContext can resetCurrentThreadStatistics() in its normal setup().

Then have test cases which verify that stats are shared invoke some method which does the file create/read, taking in a path which can be derived off methodPath. That way: no hardcoded paths in the test suite.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor tuning of the reset operation, to make things slightly more efficient

@mehakmeet
Copy link
Contributor Author

Changed the way streams are accessing the thread IOStatistics, now we would directly get them from the current active context in the stream's constructor rather than pass them around through the builders, as it didn't seem to add anything if we can directly get it due to static nature of the context. Also, made the weakRef as IOstatisticsSnapshot rather than an aggregator and then cast as discussed above.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm writing an initial PoC of stats collection in spark.

Spark wants to do incremental updates as a task goes along, which it does with a heartbeat thread invoking callbacks to collect the is Fred level stats of the worker threads.

And we can wire that up simply by calling getThreadIOStatistics() in the worker thread, caching the value. But resetting after updates isn't so easy.

I think we are going to need a specific class for each thread which offers the aggregator, snapshot and reset operations.

Which we can do in the + by adding non static state there, including a reset() and snapshot() methods.

the worker thread I would cache that value

workerContext: IOStatisticsContext = IOStatisticsContext.getContextForCurrentThread

Then to update the task metrics, the operation would be

def getCurrentStatistics():IOStatistics = {
	// snapshot current value
	current = workerContext.snapshot
	// then reset the stats on the worker thread.
	workerContext.reset
  current
}

// which then is used to update the task metrics
taskMetrics.updateStatistics(getCurrentStatistics())

*
* @return the instance of IOStatisticsAggregator for the current thread.
*/
public IOStatisticsAggregator getThreadIOStatistics() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call this ThreadIOStatisticsAggregator

@apache apache deleted a comment from hadoop-yetus Jul 12, 2022
@steveloughran
Copy link
Contributor

ok, i have some changes but I will add them as a commit for you to review/cherrypick...i want to make sure they line up with a local spark build.

key point: interfaces now support static methods, so looking up the current context can/should be a static method in IOStatisticsContext

* move reference map and lookup to a IOStatisticsContextIntegration class
* static method in IOStatisticsContext to relay lookup
* add method to switch a thread's context; needed to aggregate worker thread
  IO in threads doing work for committers without the need to explicitly
  collect and pass back the stats
* production code moves to the new methods
* tests move to this and away from looking up the fields in the streams
* stats are reset in s3a test setup
* s3a committers collect data read stats during job commit and include
  in summary statistics. This is only the stats when reading manifest
  files, not the actual work.
* tests to print the aggregate of all loaded success files in the run.

Change-Id: I604990f2132b76d38e85ca8b777630225c32158e
* Cost of scan/load of magic files in task commit are collected
* S3A list iterators update the context stats of the thread they were
  created in in close() calls.
* With close() passthrough working and TaskPool invoking it if the iterator
  is closeable.

Change-Id: If0a0c2de08d52a74b7c1f9498716d423b97b4003
@steveloughran
Copy link
Contributor

This is my draft commit message btw


Adds a new IOStatistics class IOStatisticsContext.

This is the active collector of thread-level statistics for
The current thread.

The S3A Filesystem's input and output streams, and listing
operations, all update this context when close() is called on
them (and not before!), so there is effectively automatic
aggregation of all IO statistics performed by a single thread.

The IOStatisticsContext of a thread can be retrieved and
cached for invocation in other threads. Holding such a
reference also ensures that the context will not be garbage
collected.

To collect statistics on a thread:

  1. Retrieve the active context with a call to
    IOStatisticsContext.getCurrentIOStatisticsContext()
  2. Call IOStatisticsContext.reset() to reset all statistics
  3. Call getIOStatistics() on it for the latest values, or
    snapshot() for a snapshot of them.

To instrument filesystem objects for thread-level
IOStatistics

  1. Cache the current IOStatisticsContext context or just its
    aggregator in the object constructor.
  2. In the close() operation, aggregate() the object's own statistics.
  3. Pass the context into worker threads performing work
    on behalf of this thread, through
    IOStatisticsContext.setThreadIOStatisticsContext();
    set it to null afterwards.

TaskPool does the context propagation and reset
automatically.

Contributed by Mehakmeet Singh


+setting thread context to null resets it
+move merging of fs stats into finally block, after
streamStatistics close has been called to update final
stats

Change-Id: I913b6a473da12918025e7ec11d4168bd135f0fc5
@mehakmeet
Copy link
Contributor Author

Pushed the changes, makes sense in case of null IOStatisticsContext. There is one more issue, in case we have fs.thread.level.iostatistics.enabled=false which means empty counters, hence the test getting failed in the test in this PR only, since it requires thread-level IOstatsitics to be enabled, now we do the removeBaseAndBucketOverrides(configuration, THREAD_LEVEL_IOSTATISTICS_ENABLED); configuration.setBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED, true);
But, since the way this is toggled is in a static initializer for the IOStatisticsContextIntegration utility class, it does it's calculation before this, and setting it afterwards doesn't have an affect, how do we ensure that this property is enabled for this test always? Reload the class using classLoaders(not sure if this works since I tried using the Thread.currentThread.getClassLoaderContext)?

Also

Contributed by Mehakmeet Singh

*and Steve Loughran

@steveloughran
Copy link
Contributor

good point.

how about adding a static method to enable it in the integration class?

disabling it would be harder...what if contexts had already been generated? easier to only allow the caller to go from off to on.

also, had an idea:

add a stream capability fs.capability.iocontext.supported for streams which update iocontext.

then add asserts to the test to verify this

  1. lines up for making the tests contract tests
  2. if the prefetching stream isn't (yet) context aware, we will know why the tests are failing

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, some minor tweaks

@apache apache deleted a comment from hadoop-yetus Jul 25, 2022
Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending the checkstyle fix

@steveloughran
Copy link
Contributor

ok, one more change, as I write up something on integration. The RawLocal streams should support IOContext too.

why so? makes testing integration easier, e.g. for distcp, spark etc. no need to wait until object store tests.

This also lines up for moving ITestS3AIOStatisticsContext into hadoop common unit tests as a contract test.

I'm not going to make that a requirement of this PR, but for adding abfs in we should do that

@apache apache deleted a comment from hadoop-yetus Jul 26, 2022
@apache apache deleted a comment from hadoop-yetus Jul 26, 2022
@steveloughran
Copy link
Contributor

-1 to the last patch. pick up 9dd221d from my PR

@steveloughran
Copy link
Contributor

(the streams need to cache the aggregator and update in close)

mehakmeet and others added 2 commits July 26, 2022 16:43
This is important for testing IOStatisticsContext functionality in
unit tests as well as a source of actual data.

+fixed the checkstyle

Change-Id: I4f429a6a81729027026dc46bd1519f90a145c205
@steveloughran
Copy link
Contributor

+1 pending yetus

@hadoop-yetus
Copy link

🎊 +1 overall

Vote Subsystem Runtime Logfile Comment
+0 🆗 reexec 0m 44s Docker mode activated.
_ Prechecks _
+1 💚 dupname 0m 0s No case conflicting files found.
+0 🆗 codespell 0m 1s codespell was not available.
+0 🆗 detsecrets 0m 1s detect-secrets was not available.
+1 💚 @author 0m 0s The patch does not contain any @author tags.
+1 💚 test4tests 0m 0s The patch appears to include 6 new or modified test files.
_ trunk Compile Tests _
+0 🆗 mvndep 15m 39s Maven dependency ordering for branch
+1 💚 mvninstall 25m 30s trunk passed
+1 💚 compile 23m 9s trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1
+1 💚 compile 20m 44s trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07
+1 💚 checkstyle 4m 30s trunk passed
+1 💚 mvnsite 3m 44s trunk passed
+1 💚 javadoc 3m 1s trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1
+1 💚 javadoc 2m 42s trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07
+1 💚 spotbugs 5m 4s trunk passed
+1 💚 shadedclient 22m 11s branch has no errors when building and testing our client artifacts.
-0 ⚠️ patch 22m 42s Used diff version of patch file. Binary files and potentially other changes not applied. Please rebase and squash commits if necessary.
_ Patch Compile Tests _
+0 🆗 mvndep 0m 30s Maven dependency ordering for patch
+1 💚 mvninstall 1m 46s the patch passed
+1 💚 compile 22m 31s the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1
+1 💚 javac 22m 31s the patch passed
+1 💚 compile 21m 30s the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07
+1 💚 javac 21m 30s the patch passed
+1 💚 blanks 0m 0s The patch has no blanks issues.
+1 💚 checkstyle 4m 17s root: The patch generated 0 new + 97 unchanged - 1 fixed = 97 total (was 98)
+1 💚 mvnsite 3m 24s the patch passed
+1 💚 javadoc 2m 21s the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1
+1 💚 javadoc 2m 7s the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07
+1 💚 spotbugs 4m 45s the patch passed
+1 💚 shadedclient 23m 30s patch has no errors when building and testing our client artifacts.
_ Other Tests _
+1 💚 unit 18m 36s hadoop-common in the patch passed.
+1 💚 unit 3m 7s hadoop-aws in the patch passed.
+1 💚 asflicense 1m 26s The patch does not generate ASF License warnings.
241m 14s
Subsystem Report/Notes
Docker ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4352/17/artifact/out/Dockerfile
GITHUB PR #4352
Optional Tests dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell detsecrets
uname Linux fca9fdd4589a 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality dev-support/bin/hadoop.sh
git revision trunk / 0db1a76
Default Java Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07
Multi-JDK versions /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07
Test Results https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4352/17/testReport/
Max. process+thread count 1263 (vs. ulimit of 5500)
modules C: hadoop-common-project/hadoop-common hadoop-tools/hadoop-aws U: .
Console output https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4352/17/console
versions git=2.25.1 maven=3.6.3 spotbugs=4.2.2
Powered by Apache Yetus 0.14.0 https://yetus.apache.org

This message was automatically generated.

@steveloughran steveloughran merged commit 4c8cd61 into apache:trunk Jul 26, 2022
@steveloughran
Copy link
Contributor

ok, merged. Can you do a PR/cherrypick into branch-3.3 now?

@apache apache deleted a comment from hadoop-yetus Jul 26, 2022
@apache apache deleted a comment from hadoop-yetus Jul 26, 2022
@apache apache deleted a comment from hadoop-yetus Jul 26, 2022
@apache apache deleted a comment from hadoop-yetus Jul 26, 2022
mehakmeet added a commit to mehakmeet/hadoop that referenced this pull request Jul 27, 2022
This adds a thread-level collector of IOStatistics, IOStatisticsContext,
which can be:
* Retrieved for a thread and cached for access from other
  threads.
* reset() to record new statistics.
* Queried for live statistics through the
  IOStatisticsSource.getIOStatistics() method.
* Queries for a statistics aggregator for use in instrumented
  classes.
* Asked to create a serializable copy in snapshot()

The goal is to make it possible for applications with multiple
threads performing different work items simultaneously
to be able to collect statistics on the individual threads,
and so generate aggregate reports on the total work performed
for a specific job, query or similar unit of work.

Some changes in IOStatistics-gathering classes are needed for 
this feature
* Caching the active context's aggregator in the object's
  constructor
* Updating it in close()

Slightly more work is needed in multithreaded code,
such as the S3A committers, which collect statistics across
all threads used in task and job commit operations.

Currently the IOStatisticsContext-aware classes are:
* The S3A input stream, output stream and list iterators.
* RawLocalFileSystem's input and output streams.
* The S3A committers.
* The TaskPool class in hadoop-common, which propagates
  the active context into scheduled worker threads.

Collection of statistics in the IOStatisticsContext
is disabled process-wide by default until the feature 
is considered stable.

To enable the collection, set the option
fs.thread.level.iostatistics.enabled
to "true" in core-site.xml;
	
Contributed by Mehakmeet Singh and Steve Loughran
steveloughran pushed a commit that referenced this pull request Jul 27, 2022
This adds a thread-level collector of IOStatistics, IOStatisticsContext,
which can be:
* Retrieved for a thread and cached for access from other
  threads.
* reset() to record new statistics.
* Queried for live statistics through the
  IOStatisticsSource.getIOStatistics() method.
* Queries for a statistics aggregator for use in instrumented
  classes.
* Asked to create a serializable copy in snapshot()

The goal is to make it possible for applications with multiple
threads performing different work items simultaneously
to be able to collect statistics on the individual threads,
and so generate aggregate reports on the total work performed
for a specific job, query or similar unit of work.

Some changes in IOStatistics-gathering classes are needed for 
this feature
* Caching the active context's aggregator in the object's
  constructor
* Updating it in close()

Slightly more work is needed in multithreaded code,
such as the S3A committers, which collect statistics across
all threads used in task and job commit operations.

Currently the IOStatisticsContext-aware classes are:
* The S3A input stream, output stream and list iterators.
* RawLocalFileSystem's input and output streams.
* The S3A committers.
* The TaskPool class in hadoop-common, which propagates
  the active context into scheduled worker threads.

Collection of statistics in the IOStatisticsContext
is disabled process-wide by default until the feature 
is considered stable.

To enable the collection, set the option
fs.thread.level.iostatistics.enabled
to "true" in core-site.xml;
	
Contributed by Mehakmeet Singh and Steve Loughran
HarshitGupta11 pushed a commit to HarshitGupta11/hadoop that referenced this pull request Nov 28, 2022
This adds a thread-level collector of IOStatistics, IOStatisticsContext,
which can be:
* Retrieved for a thread and cached for access from other
  threads.
* reset() to record new statistics.
* Queried for live statistics through the
  IOStatisticsSource.getIOStatistics() method.
* Queries for a statistics aggregator for use in instrumented
  classes.
* Asked to create a serializable copy in snapshot()

The goal is to make it possible for applications with multiple
threads performing different work items simultaneously
to be able to collect statistics on the individual threads,
and so generate aggregate reports on the total work performed
for a specific job, query or similar unit of work.

Some changes in IOStatistics-gathering classes are needed for 
this feature
* Caching the active context's aggregator in the object's
  constructor
* Updating it in close()

Slightly more work is needed in multithreaded code,
such as the S3A committers, which collect statistics across
all threads used in task and job commit operations.

Currently the IOStatisticsContext-aware classes are:
* The S3A input stream, output stream and list iterators.
* RawLocalFileSystem's input and output streams.
* The S3A committers.
* The TaskPool class in hadoop-common, which propagates
  the active context into scheduled worker threads.

Collection of statistics in the IOStatisticsContext
is disabled process-wide by default until the feature 
is considered stable.

To enable the collection, set the option
fs.thread.level.iostatistics.enabled
to "true" in core-site.xml;
	
Contributed by Mehakmeet Singh and Steve Loughran
deepakdamri pushed a commit to acceldata-io/hadoop that referenced this pull request Jan 21, 2025
This adds a thread-level collector of IOStatistics, IOStatisticsContext,
which can be:
* Retrieved for a thread and cached for access from other
  threads.
* reset() to record new statistics.
* Queried for live statistics through the
  IOStatisticsSource.getIOStatistics() method.
* Queries for a statistics aggregator for use in instrumented
  classes.
* Asked to create a serializable copy in snapshot()

The goal is to make it possible for applications with multiple
threads performing different work items simultaneously
to be able to collect statistics on the individual threads,
and so generate aggregate reports on the total work performed
for a specific job, query or similar unit of work.

Some changes in IOStatistics-gathering classes are needed for
this feature
* Caching the active context's aggregator in the object's
  constructor
* Updating it in close()

Slightly more work is needed in multithreaded code,
such as the S3A committers, which collect statistics across
all threads used in task and job commit operations.

Currently the IOStatisticsContext-aware classes are:
* The S3A input stream, output stream and list iterators.
* RawLocalFileSystem's input and output streams.
* The S3A committers.
* The TaskPool class in hadoop-common, which propagates
  the active context into scheduled worker threads.

Collection of statistics in the IOStatisticsContext
is disabled process-wide by default until the feature
is considered stable.

To enable the collection, set the option
fs.thread.level.iostatistics.enabled
to "true" in core-site.xml;

Contributed by Mehakmeet Singh and Steve Loughran
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants