-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Implement transport circuit breaking in aggregator #54610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement transport circuit breaking in aggregator #54610
Conversation
|
Pinging @elastic/es-distributed (:Distributed/Network) |
ywelsch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Tim. I've left some comments.
server/src/main/java/org/elasticsearch/transport/InboundAggregator.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private void incrementReservedBytes(int delta) { | ||
| bytesToRelease.getAndAdd(delta); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should assert that this method is not called anymore after close has been called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we should assert right now that this method is only called once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a change and renamed the method to set.
server/src/main/java/org/elasticsearch/transport/InboundAggregator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/transport/InboundAggregator.java
Outdated
Show resolved
Hide resolved
| CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); | ||
| if (reg.canTripCircuitBreaker()) { | ||
| breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>"); | ||
| messageListener.onRequestReceived(requestId, action); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you've moved this out of the try block. Some implementations of this can throw an exception though. I think we need to handle those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My answer here is related to:
If a request is received before a node is accepting requests, an
exception is logged and the channel is closed. Currently we respond with
an exception. But this is dangerous as we cannot negotiate a version.
and #54610 (comment). There is only one usage of this listener and it happens at a place where it is not safe to response with the exception.
We can theoretically respond with an exception after the version handshake message. So I can call the listeners in different places if you would like.
| private static void sendErrorResponse(String actionName, TransportChannel transportChannel, Exception e) { | ||
| try { | ||
| transportChannel.sendResponse(e); | ||
| } catch (IOException inner) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we catch Exception here? We probably never want to bubble anything up here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not make this change. I think even the practice of catching IOException here is bad and will need to be addressed in a follow up. We DO want exceptions like this to be bubbled up.
If we cannot send a response we need to kill the channel. Bubbled up exceptions will kill channels.
Essentially, quite a bit of this exception handling in InboundHandler I think has issues but is beyond the scope of my current PR.
-
Unless we can successfully send a handshake response, we should not catch and handle errors during version handshakes. Doing that leads to things like this (Failed transport handshake between 6.8 and 7.6 nodes may throw a fatal AssertionError #54337). Errors during handshakes should be logged at a high level and kill the channel.
-
Unknown network errors that prevent a response like the one you are referencing here should not be caught and handled. They should be bubbled up, logged, and the channel killed to send some level of notification to the other node.
These issues are beyond the scope of my PR. But I intend to address them in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually did make this change, since it looks like we were catching Exception before when this happened at the application layer. But were only catching IOException for things before the application layer. But I do think this still needs to be ironed out to ensure that a failure to send a response, does not leave us hanging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a little more clean-up around here. I still think we need a follow-up dedicated to exception handling. But I tried to maintain consistent behavior while moving the correct direction.
|
|
||
| @Override | ||
| public void close() { | ||
| final int toRelease = bytesToRelease.getAndSet(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should protect against double-closing here, given how important it is to this correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a change.
|
|
||
| public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException { | ||
| if (uncaughtException != null) { | ||
| throw new IllegalStateException("Pipeline state corrupted by uncaught exception", uncaughtException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when do we expect this to happen? should we assert false here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically this could happen I think on the HTTP on transport error. But with all of the async handling involved here I thought it was appropriate to add a IllegalStateException, but not strict enough to add an assertion as we are very dependent on the different implements (Mock, Nio, and Netty) for the channel close path.
ywelsch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left one more comment, looking good o.w.
Would appreciate a second pair of eyes from @original-brownbear, as this is critical infrastructure code.
| try (Releasable breakerRelease = message.takeBreakerReleaseControl()) { | ||
| final TransportChannel transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, | ||
| header.isCompressed(), header.isHandshake(), () -> {}); | ||
| handshaker.handleHandshake(transportChannel, requestId, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If TransportHandshaker.handleHandshake throws an exception (e.g. IllegalStateException), that is no longer bubbled up back to the node that initiated the handshake. I'm not sure what this change of behavior entails, but would suggest backing it out of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this change.
original-brownbear
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code is ok, but I'm a little uneasy about the plan here:
incrementally circuit breaking as data is received
Are we sure this is what we want in the first place? It seems to me what we really want is to circuit break before reading full messages.
If we circuit break incrementally, that means we're blowing through a bunch of buffer space only to then throw away a message mid-way. How would we ensure liveness here? If a node has a number of large messages come in concurrently, it will never process any of them but waste buffers for all of them for large (smaller than available memory) message sizes given enough concurrent messages?
Wouldn't we instead want to check the circuit breaker in headerReceived? That way we can circuit break a message without wasting all the buffers for it. We can increment the circuit breaker right after reading the header.
If we trip it, we just drop/release all the bytes in the remainder of the message as we aggregate and prevent needlessly holding on to buffered bytes that will never be deserialized.
If we don't trip it, we would've incremented it already before reading the message and would have the guarantee that the buffers we starting holding on to for it, will not be wasted?
=> to me it seems like that's what we want and could have in this PR at low cost with the changes it introduces.
This commit does not change any circuit breaking behavior. The end result of what circuit breaking will look like I imagine will be a combination of incremental and pre-breaking bytes that we know we are about to received. The header int will inform this work. But it is not always present, accurate (compression), and there is some tension between that and our current usage of the MXBeans heap stats to circuit break. So the end result I think will look something like what you're describing. But, there are some complications and this is only an infrastructure PR. |
original-brownbear
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the explanations Tim! Definitely more flexible to have the circuit breaking further up stream no matter what we do going forward :)
ywelsch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This commit moves the action name validation and circuit breaking into the InboundAggregator. This work is valuable because it lays the groundwork for incrementally circuit breaking as data is received. This PR includes the follow behavioral change: Handshakes contribute to circuit breaking, but cannot be broken. They currently do not contribute nor are they broken.
This commit moves the action name validation and circuit breaking into
the InboundAggregator. This work is valuable because it lays the
groundwork for incrementally circuit breaking as data is received.
This PR includes the follow behavioral change:
Handshakes contribute to circuit breaking, but cannot be broken. They
currently do not contribute nor are they broken.