Skip to content

Commit be09f8b

Browse files
authored
Handle fields.with.dots in routing_path (#83148)
Our document parsing code treats: ``` {"foo.bar": "baz"} ``` the same as: ``` {"foo": {"bar": "baz"}} ``` but the code that extracted routing_path didn't! This is bad because routing_path has to identify a subset of dimensions precisely and in the same way that our mapping code identifies the dimensions. They have to line up or two time series could end up on different shards. Sad times. This makes `routing_path` interpret dots in field names as though they were an object. It creates an option for the includes/excludes filters on `XContentParser` to treats dots as objects. So the filter would see ``` {"foo.bar": "baz"} ``` as though it were: ``` {"foo": {"bar": "baz"}} ``` So the filter `foo.bar` would match both documents. This is part of #82511.
1 parent 823281c commit be09f8b

File tree

4 files changed

+87
-63
lines changed

4 files changed

+87
-63
lines changed

build-tools-internal/src/main/resources/changelog-schema.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
"TLS",
8282
"Task Management",
8383
"Transform",
84+
"TSDB",
8485
"Watcher"
8586
]
8687
},

docs/changelog/83148.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 83148
2+
summary: Handle `fields.with.dots` in `routing_path`
3+
area: TSDB
4+
type: feature
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.cluster.routing;
1010

11+
import org.apache.lucene.util.BytesRef;
12+
import org.apache.lucene.util.StringHelper;
1113
import org.elasticsearch.action.RoutingMissingException;
1214
import org.elasticsearch.cluster.metadata.IndexMetadata;
1315
import org.elasticsearch.cluster.metadata.MappingMetadata;
@@ -23,7 +25,7 @@
2325
import java.io.IOException;
2426
import java.util.ArrayList;
2527
import java.util.Collections;
26-
import java.util.Comparator;
28+
import java.util.Iterator;
2729
import java.util.List;
2830
import java.util.Set;
2931
import java.util.function.IntConsumer;
@@ -224,66 +226,78 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
224226
}
225227
assert Transports.assertNotTransportThread("parsing the _source can get slow");
226228

229+
List<NameAndHash> hashes = new ArrayList<>();
227230
try {
228231
try (XContentParser parser = sourceType.xContent().createParser(parserConfig, source.streamInput())) {
229232
parser.nextToken(); // Move to first token
230233
if (parser.currentToken() == null) {
231234
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
232235
}
233-
int hash = extractObject(parser);
236+
parser.nextToken();
237+
extractObject(hashes, null, parser);
234238
ensureExpectedToken(null, parser.nextToken(), parser);
235-
return hashToShardId(hash);
236239
}
237240
} catch (IOException | ParsingException e) {
238241
throw new IllegalArgumentException("Error extracting routing: " + e.getMessage(), e);
239242
}
243+
return hashToShardId(hashesToHash(hashes));
240244
}
241245

242-
private static int extractObject(XContentParser source) throws IOException {
243-
ensureExpectedToken(Token.FIELD_NAME, source.nextToken(), source);
244-
String firstFieldName = source.currentName();
245-
source.nextToken();
246-
int firstHash = extractItem(source);
247-
if (source.currentToken() == Token.END_OBJECT) {
248-
// Just one routing key in this object
249-
// Use ^ like Map.Entry's hashcode
250-
return Murmur3HashFunction.hash(firstFieldName) ^ firstHash;
251-
}
252-
List<NameAndHash> hashes = new ArrayList<>();
253-
hashes.add(new NameAndHash(firstFieldName, firstHash));
254-
do {
246+
private static void extractObject(List<NameAndHash> hashes, @Nullable String path, XContentParser source) throws IOException {
247+
while (source.currentToken() != Token.END_OBJECT) {
255248
ensureExpectedToken(Token.FIELD_NAME, source.currentToken(), source);
256249
String fieldName = source.currentName();
250+
String subPath = path == null ? fieldName : path + "." + fieldName;
257251
source.nextToken();
258-
hashes.add(new NameAndHash(fieldName, extractItem(source)));
259-
} while (source.currentToken() != Token.END_OBJECT);
260-
Collections.sort(hashes, Comparator.comparing(nameAndHash -> nameAndHash.name));
261-
/*
262-
* This is the same as Arrays.hash(Map.Entry<fieldName, hash>) but we're
263-
* writing it out so for extra paranoia. Changing this will change how
264-
* documents are routed and we don't want a jdk update that modifies Arrays
265-
* or Map.Entry to sneak up on us.
266-
*/
267-
int hash = 0;
268-
for (NameAndHash nameAndHash : hashes) {
269-
int thisHash = Murmur3HashFunction.hash(nameAndHash.name) ^ nameAndHash.hash;
270-
hash = 31 * hash + thisHash;
252+
extractItem(hashes, subPath, source);
271253
}
272-
return hash;
273254
}
274255

275-
private static int extractItem(XContentParser source) throws IOException {
276-
if (source.currentToken() == Token.START_OBJECT) {
277-
int hash = extractObject(source);
278-
source.nextToken();
279-
return hash;
256+
private static void extractItem(List<NameAndHash> hashes, String path, XContentParser source) throws IOException {
257+
switch (source.currentToken()) {
258+
case START_OBJECT:
259+
source.nextToken();
260+
extractObject(hashes, path, source);
261+
source.nextToken();
262+
break;
263+
case VALUE_STRING:
264+
hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
265+
source.nextToken();
266+
break;
267+
case VALUE_NULL:
268+
source.nextToken();
269+
break;
270+
default:
271+
throw new ParsingException(
272+
source.getTokenLocation(),
273+
"Routing values must be strings but found [{}]",
274+
source.currentToken()
275+
);
280276
}
281-
if (source.currentToken() == Token.VALUE_STRING) {
282-
int hash = Murmur3HashFunction.hash(source.text());
283-
source.nextToken();
284-
return hash;
277+
}
278+
279+
private static int hash(BytesRef ref) {
280+
return StringHelper.murmurhash3_x86_32(ref, 0);
281+
}
282+
283+
private static int hashesToHash(List<NameAndHash> hashes) {
284+
Collections.sort(hashes);
285+
Iterator<NameAndHash> itr = hashes.iterator();
286+
if (itr.hasNext() == false) {
287+
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
288+
}
289+
NameAndHash prev = itr.next();
290+
int hash = hash(prev.name) ^ prev.hash;
291+
while (itr.hasNext()) {
292+
NameAndHash next = itr.next();
293+
if (prev.name.equals(next.name)) {
294+
throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]");
295+
}
296+
int thisHash = hash(next.name) ^ next.hash;
297+
hash = 31 * hash + thisHash;
298+
prev = next;
285299
}
286-
throw new ParsingException(source.getTokenLocation(), "Routing values must be strings but found [{}]", source.currentToken());
300+
return hash;
287301
}
288302

289303
@Override
@@ -316,5 +330,10 @@ private String error(String operation) {
316330
}
317331
}
318332

319-
private record NameAndHash(String name, int hash) {}
333+
private static record NameAndHash(BytesRef name, int hash) implements Comparable<NameAndHash> {
334+
@Override
335+
public int compareTo(NameAndHash o) {
336+
return name.compareTo(o.name);
337+
}
338+
}
320339
}

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*/
88
package org.elasticsearch.cluster.routing;
99

10+
import org.apache.lucene.util.BytesRef;
11+
import org.apache.lucene.util.StringHelper;
1012
import org.elasticsearch.Version;
1113
import org.elasticsearch.action.RoutingMissingException;
1214
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -513,7 +515,7 @@ public void testRoutingPathOneSub() throws IOException {
513515
assertIndexShard(
514516
routing,
515517
Map.of("foo", Map.of("bar", "cat"), "baz", "dog"),
516-
Math.floorMod(hash(List.of("foo", List.of("bar", "cat"))), shards)
518+
Math.floorMod(hash(List.of("foo.bar", "cat")), shards)
517519
);
518520
}
519521

@@ -523,10 +525,16 @@ public void testRoutingPathManySubs() throws IOException {
523525
assertIndexShard(
524526
routing,
525527
Map.of("foo", Map.of("a", "cat"), "bar", Map.of("thing", "yay", "this", "too")),
526-
Math.floorMod(hash(List.of("bar", List.of("thing", "yay", "this", "too"), "foo", List.of("a", "cat"))), shards)
528+
Math.floorMod(hash(List.of("bar.thing", "yay", "bar.this", "too", "foo.a", "cat")), shards)
527529
);
528530
}
529531

532+
public void testRoutingPathDotInName() throws IOException {
533+
int shards = between(2, 1000);
534+
IndexRouting routing = indexRoutingForPath(shards, "foo.bar");
535+
assertIndexShard(routing, Map.of("foo.bar", "cat", "baz", "dog"), Math.floorMod(hash(List.of("foo.bar", "cat")), shards));
536+
}
537+
530538
public void testRoutingPathBwc() throws IOException {
531539
Version version = VersionUtils.randomIndexCompatibleVersion(random());
532540
IndexRouting routing = indexRoutingForPath(version, 8, "dim.*,other.*,top");
@@ -538,12 +546,13 @@ public void testRoutingPathBwc() throws IOException {
538546
* versions of Elasticsearch must continue to route based on the
539547
* version on the index.
540548
*/
541-
assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 0);
549+
assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 4);
542550
assertIndexShard(routing, Map.of("dim", Map.of("a", "b")), 5);
543551
assertIndexShard(routing, Map.of("dim", Map.of("c", "d")), 4);
544-
assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 5);
545-
assertIndexShard(routing, Map.of("top", "a"), 3);
546-
assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 2);
552+
assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 7);
553+
assertIndexShard(routing, Map.of("top", "a"), 5);
554+
assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 0);
555+
assertIndexShard(routing, Map.of("dim.a", "a"), 4);
547556
}
548557

549558
private IndexRouting indexRoutingForPath(int shards, String path) {
@@ -560,8 +569,8 @@ private IndexRouting indexRoutingForPath(Version createdVersion, int shards, Str
560569
);
561570
}
562571

563-
private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int id) throws IOException {
564-
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(id));
572+
private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int expected) throws IOException {
573+
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(expected));
565574
}
566575

567576
private BytesReference source(Map<String, Object> doc) throws IOException {
@@ -581,24 +590,14 @@ private BytesReference source(Map<String, Object> doc) throws IOException {
581590
/**
582591
* Build the hash we expect from the extracter.
583592
*/
584-
private int hash(List<?> keysAndValues) {
593+
private int hash(List<String> keysAndValues) {
585594
assertThat(keysAndValues.size() % 2, equalTo(0));
586595
int hash = 0;
587596
for (int i = 0; i < keysAndValues.size(); i += 2) {
588-
int thisHash = Murmur3HashFunction.hash(keysAndValues.get(i).toString()) ^ expectedValueHash(keysAndValues.get(i + 1));
589-
hash = hash * 31 + thisHash;
597+
int keyHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i)), 0);
598+
int valueHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i + 1)), 0);
599+
hash = hash * 31 + (keyHash ^ valueHash);
590600
}
591601
return hash;
592602
}
593-
594-
private int expectedValueHash(Object value) {
595-
if (value instanceof List) {
596-
return hash((List<?>) value);
597-
}
598-
if (value instanceof String) {
599-
return Murmur3HashFunction.hash((String) value);
600-
}
601-
throw new IllegalArgumentException("Unsupported value: " + value);
602-
}
603-
604603
}

0 commit comments

Comments
 (0)