Skip to content

Commit ee1113c

Browse files
committed
Tweak AggregatorBase.addRequestCircuitBreakerBytes
This modifies a method Mark added to the AggregatorBase that allows aggregations to add additional memory tracking for datastructures used during execution. If an aggregation would like to reclaim circuit breaker reserved bytes by adding a negative number, `addWithoutBreaking` should be used instead of `addEstimateBytesAndMaybeBreak`. Resolves #24511
1 parent bb66f3b commit ee1113c

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -109,26 +109,30 @@ public boolean needsScores() {
109109
}
110110

111111
/**
112-
* Increment the number of bytes that have been allocated to service this request
113-
* and potentially trigger a {@link CircuitBreakingException}. The number of bytes
114-
* allocated is automatically decremented with the circuit breaker service on
115-
* closure of this aggregator.
112+
* Increment or decrement the number of bytes that have been allocated to service
113+
* this request and potentially trigger a {@link CircuitBreakingException}. The
114+
* number of bytes allocated is automatically decremented with the circuit breaker
115+
* service on closure of this aggregator.
116+
* If memory has been returned, decrement it without tripping the breaker.
116117
* For performance reasons subclasses should not call this millions of times
117118
* each with small increments and instead batch up into larger allocations.
118119
*
119-
* @param bytesAllocated the number of additional bytes allocated
120+
* @param bytes the number of bytes to register or negative to deregister the bytes
120121
* @return the cumulative size in bytes allocated by this aggregator to service this request
121122
*/
122-
protected long addRequestCircuitBreakerBytes(long bytesAllocated) {
123-
try {
123+
protected long addRequestCircuitBreakerBytes(long bytes) {
124+
// Only use the potential to circuit break if bytes are being incremented
125+
if (bytes > 0) {
126+
this.breakerService
127+
.getBreaker(CircuitBreaker.REQUEST)
128+
.addEstimateBytesAndMaybeBreak(bytes, "<agg [" + name + "]>");
129+
} else {
124130
this.breakerService
125131
.getBreaker(CircuitBreaker.REQUEST)
126-
.addEstimateBytesAndMaybeBreak(bytesAllocated, "<agg [" + name + "]>");
127-
this.requestBytesUsed += bytesAllocated;
128-
return requestBytesUsed;
129-
} catch (CircuitBreakingException cbe) {
130-
throw cbe;
131-
}
132+
.addWithoutBreaking(bytes);
133+
}
134+
this.requestBytesUsed += bytes;
135+
return requestBytesUsed;
132136
}
133137
/**
134138
* Most aggregators don't need scores, make sure to extend this method if

0 commit comments

Comments
 (0)