Skip to content

Commit 302f8d5

Browse files
committed
Optimize warning header de-duplication (#37725)
Now that warning headers no longer contain a timestamp of when the warning was generated, we no longer need to extract the warning value from the warning to determine whether or not the warning value is duplicated. Instead, we can compare strings directly. Further, when de-duplicating warning headers, are constantly rebuilding sets. Instead of doing that, we can carry about the set with us and rebuild it if we find a new warning value. This commit applies both of these optimizations.
1 parent 6c819a5 commit 302f8d5

File tree

2 files changed

+82
-27
lines changed

2 files changed

+82
-27
lines changed

server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ void deprecated(final Set<ThreadContext> threadContexts, final String message, f
231231
while (iterator.hasNext()) {
232232
try {
233233
final ThreadContext next = iterator.next();
234-
next.addResponseHeader("Warning", warningHeaderValue, DeprecationLogger::extractWarningValueFromWarningHeader);
234+
next.addResponseHeader("Warning", warningHeaderValue);
235235
} catch (final IllegalStateException e) {
236236
// ignored; it should be removed shortly
237237
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,18 @@
3636
import java.nio.charset.StandardCharsets;
3737
import java.util.ArrayList;
3838
import java.util.Collections;
39+
import java.util.EnumSet;
3940
import java.util.HashMap;
41+
import java.util.LinkedHashSet;
4042
import java.util.List;
4143
import java.util.Map;
4244
import java.util.Set;
4345
import java.util.concurrent.atomic.AtomicBoolean;
46+
import java.util.function.BiConsumer;
47+
import java.util.function.BinaryOperator;
4448
import java.util.function.Function;
4549
import java.util.function.Supplier;
46-
import java.util.stream.Collectors;
50+
import java.util.stream.Collector;
4751
import java.util.stream.Stream;
4852

4953
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
@@ -258,11 +262,11 @@ public Map<String, String> getHeaders() {
258262
* @return Never {@code null}.
259263
*/
260264
public Map<String, List<String>> getResponseHeaders() {
261-
Map<String, List<String>> responseHeaders = threadLocal.get().responseHeaders;
265+
Map<String, Set<String>> responseHeaders = threadLocal.get().responseHeaders;
262266
HashMap<String, List<String>> map = new HashMap<>(responseHeaders.size());
263267

264-
for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
265-
map.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
268+
for (Map.Entry<String, Set<String>> entry : responseHeaders.entrySet()) {
269+
map.put(entry.getKey(), Collections.unmodifiableList(new ArrayList<>(entry.getValue())));
266270
}
267271

268272
return Collections.unmodifiableMap(map);
@@ -405,7 +409,7 @@ default void restore() {
405409
private static final class ThreadContextStruct {
406410
private final Map<String, String> requestHeaders;
407411
private final Map<String, Object> transientHeaders;
408-
private final Map<String, List<String>> responseHeaders;
412+
private final Map<String, Set<String>> responseHeaders;
409413
private final boolean isSystemContext;
410414
private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header
411415
private ThreadContextStruct(StreamInput in) throws IOException {
@@ -416,7 +420,23 @@ private ThreadContextStruct(StreamInput in) throws IOException {
416420
}
417421

418422
this.requestHeaders = requestHeaders;
419-
this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
423+
this.responseHeaders = in.readMap(StreamInput::readString, input -> {
424+
final int size = input.readVInt();
425+
if (size == 0) {
426+
return Collections.emptySet();
427+
} else if (size == 1) {
428+
return Collections.singleton(input.readString());
429+
} else {
430+
// use a linked hash set to preserve order
431+
final LinkedHashSet<String> values = new LinkedHashSet<>(size);
432+
for (int i = 0; i < size; i++) {
433+
final String value = input.readString();
434+
final boolean added = values.add(value);
435+
assert added : value;
436+
}
437+
return values;
438+
}
439+
});
420440
this.transientHeaders = Collections.emptyMap();
421441
isSystemContext = false; // we never serialize this it's a transient flag
422442
this.warningHeadersSize = 0L;
@@ -430,7 +450,7 @@ private ThreadContextStruct setSystemContext() {
430450
}
431451

432452
private ThreadContextStruct(Map<String, String> requestHeaders,
433-
Map<String, List<String>> responseHeaders,
453+
Map<String, Set<String>> responseHeaders,
434454
Map<String, Object> transientHeaders, boolean isSystemContext) {
435455
this.requestHeaders = requestHeaders;
436456
this.responseHeaders = responseHeaders;
@@ -440,7 +460,7 @@ private ThreadContextStruct(Map<String, String> requestHeaders,
440460
}
441461

442462
private ThreadContextStruct(Map<String, String> requestHeaders,
443-
Map<String, List<String>> responseHeaders,
463+
Map<String, Set<String>> responseHeaders,
444464
Map<String, Object> transientHeaders, boolean isSystemContext,
445465
long warningHeadersSize) {
446466
this.requestHeaders = requestHeaders;
@@ -481,19 +501,19 @@ private ThreadContextStruct putHeaders(Map<String, String> headers) {
481501
}
482502
}
483503

484-
private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers) {
504+
private ThreadContextStruct putResponseHeaders(Map<String, Set<String>> headers) {
485505
assert headers != null;
486506
if (headers.isEmpty()) {
487507
return this;
488508
}
489-
final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
490-
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
509+
final Map<String, Set<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
510+
for (Map.Entry<String, Set<String>> entry : headers.entrySet()) {
491511
String key = entry.getKey();
492-
final List<String> existingValues = newResponseHeaders.get(key);
512+
final Set<String> existingValues = newResponseHeaders.get(key);
493513
if (existingValues != null) {
494-
List<String> newValues = Stream.concat(entry.getValue().stream(),
495-
existingValues.stream()).distinct().collect(Collectors.toList());
496-
newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
514+
final Set<String> newValues =
515+
Stream.concat(entry.getValue().stream(), existingValues.stream()).collect(LINKED_HASH_SET_COLLECTOR);
516+
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
497517
} else {
498518
newResponseHeaders.put(key, entry.getValue());
499519
}
@@ -523,20 +543,19 @@ private ThreadContextStruct putResponse(final String key, final String value, fi
523543
}
524544
}
525545

526-
final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
527-
final List<String> existingValues = newResponseHeaders.get(key);
546+
final Map<String, Set<String>> newResponseHeaders;
547+
final Set<String> existingValues = responseHeaders.get(key);
528548
if (existingValues != null) {
529-
final Set<String> existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet());
530-
assert existingValues.size() == existingUniqueValues.size() :
531-
"existing values: [" + existingValues + "], existing unique values [" + existingUniqueValues + "]";
532-
if (existingUniqueValues.contains(uniqueValue.apply(value))) {
549+
if (existingValues.contains(uniqueValue.apply(value))) {
533550
return this;
534551
}
535-
final List<String> newValues = new ArrayList<>(existingValues);
536-
newValues.add(value);
537-
newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
552+
// preserve insertion order
553+
final Set<String> newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR);
554+
newResponseHeaders = new HashMap<>(responseHeaders);
555+
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
538556
} else {
539-
newResponseHeaders.put(key, Collections.singletonList(value));
557+
newResponseHeaders = new HashMap<>(responseHeaders);
558+
newResponseHeaders.put(key, Collections.singleton(value));
540559
}
541560

542561
//check if we can add another warning header - if max count within limits
@@ -588,7 +607,7 @@ private void writeTo(StreamOutput out, Map<String, String> defaultHeaders) throw
588607
out.writeString(entry.getValue());
589608
}
590609

591-
out.writeMapOfLists(responseHeaders, StreamOutput::writeString, StreamOutput::writeString);
610+
out.writeMap(responseHeaders, StreamOutput::writeString, StreamOutput::writeStringCollection);
592611
}
593612
}
594613

@@ -751,4 +770,40 @@ public AbstractRunnable unwrap() {
751770
return in;
752771
}
753772
}
773+
774+
private static final Collector<String, Set<String>, Set<String>> LINKED_HASH_SET_COLLECTOR = new LinkedHashSetCollector<>();
775+
776+
private static class LinkedHashSetCollector<T> implements Collector<T, Set<T>, Set<T>> {
777+
@Override
778+
public Supplier<Set<T>> supplier() {
779+
return LinkedHashSet::new;
780+
}
781+
782+
@Override
783+
public BiConsumer<Set<T>, T> accumulator() {
784+
return Set::add;
785+
}
786+
787+
@Override
788+
public BinaryOperator<Set<T>> combiner() {
789+
return (left, right) -> {
790+
left.addAll(right);
791+
return left;
792+
};
793+
}
794+
795+
@Override
796+
public Function<Set<T>, Set<T>> finisher() {
797+
return Function.identity();
798+
}
799+
800+
private static final Set<Characteristics> CHARACTERISTICS =
801+
Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
802+
803+
@Override
804+
public Set<Characteristics> characteristics() {
805+
return CHARACTERISTICS;
806+
}
807+
}
808+
754809
}

0 commit comments

Comments
 (0)