-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph #17735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 075f8ee (Tue Nov 09 12:28:16 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change LGTM overall 👍 My main question would be whether we could unify the JobStatus metrics between DefaultScheduler / AdaptiveScheduler / ExecutionGraph a bit more, by reusing the new JobStatusStore and registerMetrics().
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
Show resolved
Hide resolved
| // wait for the second task submissions | ||
| taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5)); | ||
|
|
||
| // sleep a bit to ensure uptime is > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about passing a Clock instance to Adaptive scheduler instead of relying on System.currentTimeMillis() to simplify the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally like doing that, but I'm wondering if this would work properly for the AdaptiveScheduler in that truly all time-measurements go through the clock. For smaller self-contained components it is easy to ensure that, but this isn't the case here because we re-use some parts of the SchedulerBase/DefaultScheduler, there are multiple state classes, then internally there is the EG, ....
It would be a bit unsatisfactory to introduce a clock but only use it in one place :/
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great job ;)
...runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
Outdated
Show resolved
Hide resolved
...runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
Outdated
Show resolved
Hide resolved
|
|
||
| registerMetrics(); | ||
| SchedulerBase.registerJobMetrics( | ||
| jobManagerJobMetricGroup, jobStatusStore, () -> (long) numRestarts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does numRestarts need to be volatile? If I understand that correctly if we access the metric eg. using JMX, then it gets accessed by a different thread. Or is there some synchronization in the metrics system that I'm missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does numRestarts need to be volatile?
Technically yes, but we generally don't do it. It would be too expensive on the hot code paths (aka, we can't be consistent about it anyway), and we haven't had issues so far 🤷
Or is there some synchronization in the metrics system that I'm missing?
There is none.
Based on #17722.
The down-/up-/restartTime metrics are now setup in the schedulersinstead of the ExecutionGraph, similar to the numRestart metrics.
To this end the AdaptiveScheduler now maintains its own set of state timestamps, according to the job state transitions that the scheduler advertises (i.e., they are not based on the ExecutionGraph).
This prevents collisions upon rescaling as they are now only registered once.