1919
2020package org .elasticsearch .transport ;
2121
22+ import org .elasticsearch .common .breaker .CircuitBreaker ;
23+ import org .elasticsearch .common .breaker .CircuitBreakingException ;
2224import org .elasticsearch .common .bytes .BytesArray ;
2325import org .elasticsearch .common .bytes .CompositeBytesReference ;
2426import org .elasticsearch .common .bytes .ReleasableBytesReference ;
2729
2830import java .io .IOException ;
2931import java .util .ArrayList ;
32+ import java .util .concurrent .atomic .AtomicInteger ;
33+ import java .util .function .Function ;
34+ import java .util .function .Predicate ;
35+ import java .util .function .Supplier ;
3036
3137public class InboundAggregator implements Releasable {
3238
39+ private final Supplier <CircuitBreaker > circuitBreaker ;
40+ private final Predicate <String > requestCanTripBreaker ;
41+
3342 private ReleasableBytesReference firstContent ;
3443 private ArrayList <ReleasableBytesReference > contentAggregation ;
3544 private Header currentHeader ;
45+ private Exception aggregationException ;
46+ private boolean canTripBreaker = true ;
3647 private boolean isClosed = false ;
3748
49+ public InboundAggregator (Supplier <CircuitBreaker > circuitBreaker ,
50+ Function <String , RequestHandlerRegistry <TransportRequest >> registryFunction ) {
51+ this (circuitBreaker , (Predicate <String >) actionName -> {
52+ final RequestHandlerRegistry <TransportRequest > reg = registryFunction .apply (actionName );
53+ if (reg == null ) {
54+ throw new ActionNotFoundTransportException (actionName );
55+ } else {
56+ return reg .canTripCircuitBreaker ();
57+ }
58+ });
59+ }
60+
61+ // Visible for testing
62+ InboundAggregator (Supplier <CircuitBreaker > circuitBreaker , Predicate <String > requestCanTripBreaker ) {
63+ this .circuitBreaker = circuitBreaker ;
64+ this .requestCanTripBreaker = requestCanTripBreaker ;
65+ }
66+
3867 public void headerReceived (Header header ) {
3968 ensureOpen ();
4069 assert isAggregating () == false ;
4170 assert firstContent == null && contentAggregation == null ;
4271 currentHeader = header ;
72+ if (currentHeader .isRequest () && currentHeader .needsToReadVariableHeader () == false ) {
73+ initializeRequestState ();
74+ }
4375 }
4476
4577 public void aggregate (ReleasableBytesReference content ) {
4678 ensureOpen ();
4779 assert isAggregating ();
48- if (isFirstContent ()) {
49- firstContent = content .retain ();
50- } else {
51- if (contentAggregation == null ) {
52- contentAggregation = new ArrayList <>(4 );
53- contentAggregation .add (firstContent );
54- firstContent = null ;
80+ if (isShortCircuited () == false ) {
81+ if (isFirstContent ()) {
82+ firstContent = content .retain ();
83+ } else {
84+ if (contentAggregation == null ) {
85+ contentAggregation = new ArrayList <>(4 );
86+ assert firstContent != null ;
87+ contentAggregation .add (firstContent );
88+ firstContent = null ;
89+ }
90+ contentAggregation .add (content .retain ());
5591 }
56- contentAggregation .add (content .retain ());
5792 }
5893 }
5994
60- public Header cancelAggregation () {
61- ensureOpen ();
62- assert isAggregating ();
63- final Header header = this .currentHeader ;
64- closeCurrentAggregation ();
65- return header ;
66- }
67-
6895 public InboundMessage finishAggregation () throws IOException {
6996 ensureOpen ();
7097 final ReleasableBytesReference releasableContent ;
@@ -77,16 +104,30 @@ public InboundMessage finishAggregation() throws IOException {
77104 final CompositeBytesReference content = new CompositeBytesReference (references );
78105 releasableContent = new ReleasableBytesReference (content , () -> Releasables .close (references ));
79106 }
80- final InboundMessage aggregated = new InboundMessage (currentHeader , releasableContent );
81- resetCurrentAggregation ();
107+
108+ final BreakerControl breakerControl = new BreakerControl (circuitBreaker );
109+ final InboundMessage aggregated = new InboundMessage (currentHeader , releasableContent , breakerControl );
82110 boolean success = false ;
83111 try {
84112 if (aggregated .getHeader ().needsToReadVariableHeader ()) {
85113 aggregated .getHeader ().finishParsingHeader (aggregated .openOrGetStreamInput ());
114+ if (aggregated .getHeader ().isRequest ()) {
115+ initializeRequestState ();
116+ }
117+ }
118+ if (isShortCircuited () == false ) {
119+ checkBreaker (aggregated .getHeader (), aggregated .getContentLength (), breakerControl );
120+ }
121+ if (isShortCircuited ()) {
122+ aggregated .close ();
123+ success = true ;
124+ return new InboundMessage (aggregated .getHeader (), aggregationException );
125+ } else {
126+ success = true ;
127+ return aggregated ;
86128 }
87- success = true ;
88- return aggregated ;
89129 } finally {
130+ resetCurrentAggregation ();
90131 if (success == false ) {
91132 aggregated .close ();
92133 }
@@ -97,6 +138,14 @@ public boolean isAggregating() {
97138 return currentHeader != null ;
98139 }
99140
141+ private void shortCircuit (Exception exception ) {
142+ this .aggregationException = exception ;
143+ }
144+
145+ private boolean isShortCircuited () {
146+ return aggregationException != null ;
147+ }
148+
100149 private boolean isFirstContent () {
101150 return firstContent == null && contentAggregation == null ;
102151 }
@@ -108,23 +157,90 @@ public void close() {
108157 }
109158
110159 private void closeCurrentAggregation () {
160+ releaseContent ();
161+ resetCurrentAggregation ();
162+ }
163+
164+ private void releaseContent () {
111165 if (contentAggregation == null ) {
112166 Releasables .close (firstContent );
113167 } else {
114168 Releasables .close (contentAggregation );
115169 }
116- resetCurrentAggregation ();
117170 }
118171
119172 private void resetCurrentAggregation () {
120173 firstContent = null ;
121174 contentAggregation = null ;
122175 currentHeader = null ;
176+ aggregationException = null ;
177+ canTripBreaker = true ;
123178 }
124179
125180 private void ensureOpen () {
126181 if (isClosed ) {
127182 throw new IllegalStateException ("Aggregator is already closed" );
128183 }
129184 }
185+
186+ private void initializeRequestState () {
187+ assert currentHeader .needsToReadVariableHeader () == false ;
188+ assert currentHeader .isRequest ();
189+ if (currentHeader .isHandshake ()) {
190+ canTripBreaker = false ;
191+ return ;
192+ }
193+
194+ final String actionName = currentHeader .getActionName ();
195+ try {
196+ canTripBreaker = requestCanTripBreaker .test (actionName );
197+ } catch (ActionNotFoundTransportException e ) {
198+ shortCircuit (e );
199+ }
200+ }
201+
202+ private void checkBreaker (final Header header , final int contentLength , final BreakerControl breakerControl ) {
203+ if (header .isRequest () == false ) {
204+ return ;
205+ }
206+ assert header .needsToReadVariableHeader () == false ;
207+
208+ if (canTripBreaker ) {
209+ try {
210+ circuitBreaker .get ().addEstimateBytesAndMaybeBreak (contentLength , header .getActionName ());
211+ breakerControl .setReservedBytes (contentLength );
212+ } catch (CircuitBreakingException e ) {
213+ shortCircuit (e );
214+ }
215+ } else {
216+ circuitBreaker .get ().addWithoutBreaking (contentLength );
217+ breakerControl .setReservedBytes (contentLength );
218+ }
219+ }
220+
221+ private static class BreakerControl implements Releasable {
222+
223+ private static final int CLOSED = -1 ;
224+
225+ private final Supplier <CircuitBreaker > circuitBreaker ;
226+ private final AtomicInteger bytesToRelease = new AtomicInteger (0 );
227+
228+ private BreakerControl (Supplier <CircuitBreaker > circuitBreaker ) {
229+ this .circuitBreaker = circuitBreaker ;
230+ }
231+
232+ private void setReservedBytes (int reservedBytes ) {
233+ final boolean set = bytesToRelease .compareAndSet (0 , reservedBytes );
234+ assert set : "Expected bytesToRelease to be 0, found " + bytesToRelease .get ();
235+ }
236+
237+ @ Override
238+ public void close () {
239+ final int toRelease = bytesToRelease .getAndSet (CLOSED );
240+ assert toRelease != CLOSED ;
241+ if (toRelease > 0 ) {
242+ circuitBreaker .get ().addWithoutBreaking (-toRelease );
243+ }
244+ }
245+ }
130246}
0 commit comments