Skip to content

Commit 5757912

Browse files
authored
Basic thread safety for ValuesSourceRegistry (#50340)
1 parent 2aeaedb commit 5757912

File tree

6 files changed

+120
-31
lines changed

6 files changed

+120
-31
lines changed

server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
4242
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
4343
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
44+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
4445
import org.elasticsearch.search.fetch.FetchSubPhase;
4546
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
4647
import org.elasticsearch.search.rescore.Rescorer;
@@ -54,6 +55,7 @@
5455
import java.util.Map;
5556
import java.util.TreeMap;
5657
import java.util.function.BiFunction;
58+
import java.util.function.Consumer;
5759

5860
import static java.util.Collections.emptyList;
5961
import static java.util.Collections.emptyMap;
@@ -250,6 +252,7 @@ public QuerySpec(String name, Writeable.Reader<T> reader, QueryParser<T> parser)
250252
*/
251253
class AggregationSpec extends SearchExtensionSpec<AggregationBuilder, Aggregator.Parser> {
252254
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
255+
private Consumer<ValuesSourceRegistry> aggregatorRegistrar;
253256

254257
/**
255258
* Specification for an {@link Aggregation}.
@@ -300,6 +303,23 @@ public AggregationSpec addResultReader(String writeableName, Writeable.Reader<?
300303
public Map<String, Writeable.Reader<? extends InternalAggregation>> getResultReaders() {
301304
return resultReaders;
302305
}
306+
307+
/**
308+
* Get the function to register the {@link org.elasticsearch.search.aggregations.support.ValuesSource} to aggregator mappings for
309+
* this aggregation
310+
*/
311+
public Consumer<ValuesSourceRegistry> getAggregatorRegistrar() {
312+
return aggregatorRegistrar;
313+
}
314+
315+
/**
316+
* Set the function to register the {@link org.elasticsearch.search.aggregations.support.ValuesSource} to aggregator mappings for
317+
* this aggregation
318+
*/
319+
public AggregationSpec setAggregatorRegistrar(Consumer<ValuesSourceRegistry> aggregatorRegistrar) {
320+
this.aggregatorRegistrar = aggregatorRegistrar;
321+
return this;
322+
}
303323
}
304324

305325
/**

server/src/main/java/org/elasticsearch/search/SearchModule.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@
228228
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregator;
229229
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
230230
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
231+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
231232
import org.elasticsearch.search.fetch.FetchPhase;
232233
import org.elasticsearch.search.fetch.FetchSubPhase;
233234
import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase;
@@ -400,7 +401,8 @@ private void registerAggregations(List<SearchPlugin> plugins) {
400401
registerAggregation(new AggregationSpec(IpRangeAggregationBuilder.NAME, IpRangeAggregationBuilder::new,
401402
IpRangeAggregationBuilder::parse).addResultReader(InternalBinaryRange::new));
402403
registerAggregation(new AggregationSpec(HistogramAggregationBuilder.NAME, HistogramAggregationBuilder::new,
403-
HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new));
404+
HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new)
405+
.setAggregatorRegistrar(HistogramAggregationBuilder::registerAggregators));
404406
registerAggregation(new AggregationSpec(DateHistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder::new,
405407
DateHistogramAggregationBuilder::parse).addResultReader(InternalDateHistogram::new));
406408
registerAggregation(new AggregationSpec(AutoDateHistogramAggregationBuilder.NAME, AutoDateHistogramAggregationBuilder::new,
@@ -440,6 +442,10 @@ private void registerAggregation(AggregationSpec spec) {
440442
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
441443
namedWriteables.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, writeableName, internalReader));
442444
}
445+
Consumer<ValuesSourceRegistry> register = spec.getAggregatorRegistrar();
446+
if (register != null) {
447+
register.accept(ValuesSourceRegistry.getInstance());
448+
}
443449
}
444450

445451
private void registerPipelineAggregations(List<SearchPlugin> plugins) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregationBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@
4040
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
4141
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
4242
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
43+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
4344
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
4445

4546
import java.io.IOException;
4647
import java.util.List;
4748
import java.util.Map;
4849
import java.util.Objects;
50+
import java.util.concurrent.atomic.AtomicBoolean;
4951

5052
/**
5153
* A builder for histograms on numeric fields. This builder can operate on either base numeric fields, or numeric range fields. IP range
@@ -88,6 +90,13 @@ public static HistogramAggregationBuilder parse(String aggregationName, XContent
8890
return PARSER.parse(parser, new HistogramAggregationBuilder(aggregationName), null);
8991
}
9092

93+
private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
94+
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
95+
if (wasRegistered.compareAndSet(false, true) == true) {
96+
HistogramAggregatorFactory.registerAggregators(valuesSourceRegistry);
97+
}
98+
}
99+
91100
private double interval;
92101
private double offset = 0;
93102
private double minBound = Double.POSITIVE_INFINITY;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ public final class HistogramAggregatorFactory extends ValuesSourceAggregatorFact
5555
private final double minBound, maxBound;
5656

5757
// TODO: Registration should happen on the actual aggregator classes, but I don't want to set up the whole dynamic loading thing yet
58-
static {
59-
ValuesSourceRegistry.getInstance().register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.RANGE,
58+
static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
59+
valuesSourceRegistry.register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.RANGE,
6060
new HistogramAggregatorSupplier() {
6161
@Override
6262
public Aggregator build(String name, AggregatorFactories factories, double interval, double offset,
@@ -76,7 +76,7 @@ public Aggregator build(String name, AggregatorFactories factories, double inter
7676
(fieldType, indexFieldData) -> fieldType instanceof RangeFieldMapper.RangeFieldType
7777
);
7878

79-
ValuesSourceRegistry.getInstance().register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC,
79+
valuesSourceRegistry.register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC,
8080
new HistogramAggregatorSupplier() {
8181
@Override
8282
public Aggregator build(String name, AggregatorFactories factories, double interval, double offset,

server/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceRegistry.java

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@
2727
import org.elasticsearch.search.aggregations.AggregationExecutionException;
2828

2929
import java.util.AbstractMap;
30-
import java.util.ArrayList;
31-
import java.util.HashMap;
3230
import java.util.List;
3331
import java.util.Map;
32+
import java.util.StringJoiner;
3433
import java.util.function.BiFunction;
3534

3635
/*
@@ -39,49 +38,76 @@
3938
*/
4039
public enum ValuesSourceRegistry {
4140
INSTANCE {
42-
Map<String, Map<ValuesSourceType, AggregatorSupplier>> aggregatorRegistry = new HashMap<>();
41+
Map<String, Map<ValuesSourceType, AggregatorSupplier>> aggregatorRegistry = Map.of();
4342
// We use a List of Entries here to approximate an ordered map
4443
Map<String, List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>>> resolverRegistry
45-
= new HashMap<>();
44+
= Map.of();
4645

46+
/**
47+
* Threading behavior notes: This call is both synchronized and expensive. It copies the entire existing mapping structure each
48+
* time it is invoked. We expect that register will be called a small number of times during startup only (as plugins are being
49+
* registered) and we can tolerate the cost at that time. Once all plugins are registered, we should never need to call register
50+
* again. Comparatively, we expect to do many reads from the registry data structures, and those reads may be interleaved on
51+
* different worker threads. Thus we want to optimize the read case to be thread safe and fast, which the immutable
52+
* collections do well. Using immutable collections requires a copy on write mechanic, thus the somewhat non-intuitive
53+
* implementation of this method.
54+
*
55+
* @param aggregationName The name of the family of aggregations, typically found via ValuesSourceAggregationBuilder.getType()
56+
* @param valuesSourceType The ValuesSourceType this mapping applies to.
57+
* @param aggregatorSupplier An Aggregation-specific specialization of AggregatorSupplier which will construct the mapped aggregator
58+
* from the aggregation standard set of parameters
59+
* @param resolveValuesSourceType A predicate operating on MappedFieldType and IndexFieldData instances which decides if the mapped
60+
*/
4761
@Override
48-
public void register(String aggregationName, ValuesSourceType valuesSourceType,AggregatorSupplier aggregatorSupplier,
62+
public synchronized void register(String aggregationName, ValuesSourceType valuesSourceType, AggregatorSupplier aggregatorSupplier,
4963
BiFunction<MappedFieldType, IndexFieldData, Boolean> resolveValuesSourceType) {
50-
if (resolverRegistry.containsKey(aggregationName) == false) {
51-
resolverRegistry.put(aggregationName, new ArrayList<>());
64+
// Aggregator registry block - do this first in case we need to throw on duplicate registration
65+
Map<ValuesSourceType, AggregatorSupplier> innerMap;
66+
if (aggregatorRegistry.containsKey(aggregationName)) {
67+
if (aggregatorRegistry.get(aggregationName).containsKey(valuesSourceType)) {
68+
throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", "
69+
+ valuesSourceType.toString() + "]");
70+
}
71+
innerMap = copyAndAdd(aggregatorRegistry.get(aggregationName),
72+
new AbstractMap.SimpleEntry<>(valuesSourceType, aggregatorSupplier));
73+
} else {
74+
innerMap = Map.of(valuesSourceType, aggregatorSupplier);
5275
}
53-
List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>> resolverList
54-
= resolverRegistry.get(aggregationName);
55-
resolverList.add(new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType));
76+
aggregatorRegistry = copyAndAdd(aggregatorRegistry, new AbstractMap.SimpleEntry<>(aggregationName, innerMap));
5677

57-
if (aggregatorRegistry.containsKey(aggregationName) == false) {
58-
aggregatorRegistry.put(aggregationName, new HashMap<>());
59-
}
60-
Map<ValuesSourceType, AggregatorSupplier> innerMap = aggregatorRegistry.get(aggregationName);
61-
if (innerMap.containsKey(valuesSourceType)) {
62-
throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", "
63-
+ valuesSourceType.toString() + "]");
78+
// Resolver registry block
79+
AbstractMap.SimpleEntry[] mappings;
80+
if (resolverRegistry.containsKey(aggregationName)) {
81+
List currentMappings = resolverRegistry.get(aggregationName);
82+
mappings = (AbstractMap.SimpleEntry[]) currentMappings.toArray(new AbstractMap.SimpleEntry[currentMappings.size() + 1]);
83+
} else {
84+
mappings = new AbstractMap.SimpleEntry[1];
6485
}
65-
innerMap.put(valuesSourceType, aggregatorSupplier);
86+
mappings[mappings.length - 1] = new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType);
87+
resolverRegistry = copyAndAdd(resolverRegistry,new AbstractMap.SimpleEntry<>(aggregationName, List.of(mappings)));
6688
}
6789

6890
@Override
6991
public AggregatorSupplier getAggregator(ValuesSourceType valuesSourceType, String aggregationName) {
70-
if (aggregatorRegistry.containsKey(aggregationName)) {
92+
StringJoiner validSourceTypes = new StringJoiner(",", "[", "]");
93+
if (aggregationName != null && aggregatorRegistry.containsKey(aggregationName)) {
7194
Map<ValuesSourceType, AggregatorSupplier> innerMap = aggregatorRegistry.get(aggregationName);
72-
if (innerMap.containsKey(valuesSourceType)) {
95+
if (valuesSourceType != null && innerMap.containsKey(valuesSourceType)) {
7396
return innerMap.get(valuesSourceType);
7497
}
98+
for (ValuesSourceType validVST : innerMap.keySet()) {
99+
validSourceTypes.add(validVST.toString());
100+
}
101+
throw new AggregationExecutionException("ValuesSource type " + valuesSourceType.toString() +
102+
" is not supported for aggregation" + aggregationName + ". Valid choices are " + validSourceTypes.toString());
75103
}
76-
// TODO: Error message should list valid ValuesSource types
77-
throw new AggregationExecutionException("ValuesSource type " + valuesSourceType.toString() +
78-
" is not supported for aggregation" + aggregationName);
104+
throw new AggregationExecutionException("Unregistered Aggregation [" + aggregationName + "]");
79105
}
80106

81107
@Override
82108
public ValuesSourceType getValuesSourceType(MappedFieldType fieldType, IndexFieldData indexFieldData, String aggregationName,
83109
ValueType valueType) {
84-
if (resolverRegistry.containsKey(aggregationName)) {
110+
if (aggregationName != null && resolverRegistry.containsKey(aggregationName)) {
85111
List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>> resolverList
86112
= resolverRegistry.get(aggregationName);
87113
for (AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType> entry : resolverList) {
@@ -90,8 +116,9 @@ public ValuesSourceType getValuesSourceType(MappedFieldType fieldType, IndexFiel
90116
return entry.getValue();
91117
}
92118
}
93-
// TODO: Error message should list valid field types; not sure fieldType.toString() is the best choice.
94-
throw new IllegalArgumentException("Field type " + fieldType.toString() + " is not supported for aggregation "
119+
// TODO: Error message should list valid field types
120+
String fieldDescription = fieldType.name() + "(" + fieldType.toString() + ")";
121+
throw new IllegalArgumentException("Field type " + fieldDescription + " is not supported for aggregation "
95122
+ aggregationName);
96123
} else {
97124
// TODO: Legacy resolve logic; remove this after converting all aggregations to the new system
@@ -133,4 +160,26 @@ public abstract ValuesSourceType getValuesSourceType(MappedFieldType fieldType,
133160
ValueType valueType);
134161

135162
public static ValuesSourceRegistry getInstance() {return INSTANCE;}
163+
164+
private static <K, V> Map copyAndAdd(Map<K, V> source, Map.Entry<K, V> newValue) {
165+
Map.Entry[] entries;
166+
if (source.containsKey(newValue.getKey())) {
167+
// Replace with new value
168+
entries = new Map.Entry[source.size()];
169+
int i = 0;
170+
for (Map.Entry entry : source.entrySet()) {
171+
if (entry.getKey() == newValue.getKey()) {
172+
entries[i] = newValue;
173+
} else {
174+
entries[i] = entry;
175+
}
176+
i++;
177+
}
178+
} else {
179+
entries = source.entrySet().toArray(new Map.Entry[source.size() + 1]);
180+
entries[entries.length - 1] = newValue;
181+
}
182+
return Map.ofEntries(entries);
183+
}
184+
136185
}

test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
6666
import org.elasticsearch.mock.orig.Mockito;
6767
import org.elasticsearch.script.ScriptService;
68+
import org.elasticsearch.search.SearchModule;
6869
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
6970
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
7071
import org.elasticsearch.search.fetch.FetchPhase;
@@ -76,6 +77,7 @@
7677
import org.elasticsearch.test.ESTestCase;
7778
import org.elasticsearch.test.InternalAggregationTestCase;
7879
import org.junit.After;
80+
import org.junit.BeforeClass;
7981

8082
import java.io.IOException;
8183
import java.util.ArrayList;
@@ -123,8 +125,11 @@ private static void registerFieldTypes(SearchContext searchContext, MapperServic
123125
when(mapperService.fullName(fieldName)).thenReturn(fieldType);
124126
when(searchContext.smartNameFieldType(fieldName)).thenReturn(fieldType);
125127
}
128+
}
126129

127-
130+
@BeforeClass
131+
public static void initValuesSourceRegistry() {
132+
new SearchModule(Settings.EMPTY, List.of());
128133
}
129134

130135
protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,

0 commit comments

Comments
 (0)