2323import org .elasticsearch .common .io .stream .StreamInput ;
2424import org .elasticsearch .common .io .stream .StreamOutput ;
2525import org .elasticsearch .common .io .stream .Writeable ;
26+ import org .elasticsearch .common .logging .DeprecationLogger ;
2627import org .elasticsearch .common .logging .ESLoggerFactory ;
2728import org .elasticsearch .common .settings .Setting ;
2829import org .elasticsearch .common .settings .Setting .Property ;
2930import org .elasticsearch .common .settings .Settings ;
31+ import org .elasticsearch .common .unit .ByteSizeValue ;
32+ import org .elasticsearch .http .HttpTransportSettings ;
33+
34+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_COUNT ;
35+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE ;
3036
3137import java .io .Closeable ;
3238import java .io .IOException ;
3945import java .util .Set ;
4046import java .util .concurrent .CancellationException ;
4147import java .util .concurrent .ExecutionException ;
42- import java .util .concurrent .FutureTask ;
4348import java .util .concurrent .RunnableFuture ;
4449import java .util .concurrent .atomic .AtomicBoolean ;
4550import java .util .function .Function ;
4651import java .util .function .Supplier ;
4752import java .util .stream .Collectors ;
4853import java .util .stream .Stream ;
54+ import java .nio .charset .StandardCharsets ;
55+
4956
5057/**
5158 * A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
@@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable {
8188 private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct ();
8289 private final Map <String , String > defaultHeader ;
8390 private final ContextThreadLocal threadLocal ;
91+ private final int maxWarningHeaderCount ;
92+ private final long maxWarningHeaderSize ;
8493
8594 /**
8695 * Creates a new ThreadContext instance
@@ -98,6 +107,8 @@ public ThreadContext(Settings settings) {
98107 this .defaultHeader = Collections .unmodifiableMap (defaultHeader );
99108 }
100109 threadLocal = new ContextThreadLocal ();
110+ this .maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT .get (settings );
111+ this .maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE .get (settings ).getBytes ();
101112 }
102113
103114 @ Override
@@ -282,7 +293,7 @@ public void addResponseHeader(final String key, final String value) {
282293 * @param uniqueValue the function that produces de-duplication values
283294 */
284295 public void addResponseHeader (final String key , final String value , final Function <String , String > uniqueValue ) {
285- threadLocal .set (threadLocal .get ().putResponse (key , value , uniqueValue ));
296+ threadLocal .set (threadLocal .get ().putResponse (key , value , uniqueValue , maxWarningHeaderCount , maxWarningHeaderSize ));
286297 }
287298
288299 /**
@@ -359,7 +370,7 @@ private static final class ThreadContextStruct {
359370 private final Map <String , Object > transientHeaders ;
360371 private final Map <String , List <String >> responseHeaders ;
361372 private final boolean isSystemContext ;
362-
373+ private long warningHeadersSize ; //saving current warning headers' size not to recalculate the size with every new warning header
363374 private ThreadContextStruct (StreamInput in ) throws IOException {
364375 final int numRequest = in .readVInt ();
365376 Map <String , String > requestHeaders = numRequest == 0 ? Collections .emptyMap () : new HashMap <>(numRequest );
@@ -371,6 +382,7 @@ private ThreadContextStruct(StreamInput in) throws IOException {
371382 this .responseHeaders = in .readMapOfLists (StreamInput ::readString , StreamInput ::readString );
372383 this .transientHeaders = Collections .emptyMap ();
373384 isSystemContext = false ; // we never serialize this it's a transient flag
385+ this .warningHeadersSize = 0L ;
374386 }
375387
376388 private ThreadContextStruct setSystemContext () {
@@ -387,6 +399,18 @@ private ThreadContextStruct(Map<String, String> requestHeaders,
387399 this .responseHeaders = responseHeaders ;
388400 this .transientHeaders = transientHeaders ;
389401 this .isSystemContext = isSystemContext ;
402+ this .warningHeadersSize = 0L ;
403+ }
404+
405+ private ThreadContextStruct (Map <String , String > requestHeaders ,
406+ Map <String , List <String >> responseHeaders ,
407+ Map <String , Object > transientHeaders , boolean isSystemContext ,
408+ long warningHeadersSize ) {
409+ this .requestHeaders = requestHeaders ;
410+ this .responseHeaders = responseHeaders ;
411+ this .transientHeaders = transientHeaders ;
412+ this .isSystemContext = isSystemContext ;
413+ this .warningHeadersSize = warningHeadersSize ;
390414 }
391415
392416 /**
@@ -440,30 +464,58 @@ private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers
440464 return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders , isSystemContext );
441465 }
442466
443- private ThreadContextStruct putResponse (final String key , final String value , final Function <String , String > uniqueValue ) {
467+ private ThreadContextStruct putResponse (final String key , final String value , final Function <String , String > uniqueValue ,
468+ final int maxWarningHeaderCount , final long maxWarningHeaderSize ) {
444469 assert value != null ;
470+ long newWarningHeaderSize = warningHeadersSize ;
471+ //check if we can add another warning header - if max size within limits
472+ if (key .equals ("Warning" ) && (maxWarningHeaderSize != -1 )) { //if size is NOT unbounded, check its limits
473+ if (warningHeadersSize > maxWarningHeaderSize ) { // if max size has already been reached before
474+ final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
475+ maxWarningHeaderSize + "] bytes set in [" +
476+ HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE .getKey () + "]!" ;
477+ ESLoggerFactory .getLogger (ThreadContext .class ).warn (message );
478+ return this ;
479+ }
480+ newWarningHeaderSize += "Warning" .getBytes (StandardCharsets .UTF_8 ).length + value .getBytes (StandardCharsets .UTF_8 ).length ;
481+ if (newWarningHeaderSize > maxWarningHeaderSize ) {
482+ final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
483+ maxWarningHeaderSize + "] bytes set in [" +
484+ HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE .getKey () + "]!" ;
485+ ESLoggerFactory .getLogger (ThreadContext .class ).warn (message );
486+ return new ThreadContextStruct (requestHeaders , responseHeaders , transientHeaders , isSystemContext , newWarningHeaderSize );
487+ }
488+ }
445489
446490 final Map <String , List <String >> newResponseHeaders = new HashMap <>(this .responseHeaders );
447491 final List <String > existingValues = newResponseHeaders .get (key );
448-
449492 if (existingValues != null ) {
450493 final Set <String > existingUniqueValues = existingValues .stream ().map (uniqueValue ).collect (Collectors .toSet ());
451494 assert existingValues .size () == existingUniqueValues .size ();
452495 if (existingUniqueValues .contains (uniqueValue .apply (value ))) {
453496 return this ;
454497 }
455-
456498 final List <String > newValues = new ArrayList <>(existingValues );
457499 newValues .add (value );
458-
459500 newResponseHeaders .put (key , Collections .unmodifiableList (newValues ));
460501 } else {
461502 newResponseHeaders .put (key , Collections .singletonList (value ));
462503 }
463504
464- return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders , isSystemContext );
505+ //check if we can add another warning header - if max count within limits
506+ if ((key .equals ("Warning" )) && (maxWarningHeaderCount != -1 )) { //if count is NOT unbounded, check its limits
507+ final int warningHeaderCount = newResponseHeaders .containsKey ("Warning" ) ? newResponseHeaders .get ("Warning" ).size () : 0 ;
508+ if (warningHeaderCount > maxWarningHeaderCount ) {
509+ final String message = "Dropping a warning header, as their total count reached the maximum allowed of [" +
510+ maxWarningHeaderCount + "] set in [" + HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_COUNT .getKey () + "]!" ;
511+ ESLoggerFactory .getLogger (ThreadContext .class ).warn (message );
512+ return this ;
513+ }
514+ }
515+ return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders , isSystemContext , newWarningHeaderSize );
465516 }
466517
518+
467519 private ThreadContextStruct putTransient (String key , Object value ) {
468520 Map <String , Object > newTransient = new HashMap <>(this .transientHeaders );
469521 if (newTransient .putIfAbsent (key , value ) != null ) {
0 commit comments