diff --git a/build-tools-internal/src/main/resources/changelog-schema.json b/build-tools-internal/src/main/resources/changelog-schema.json index 5fb237cd7c964..9d07067fdd26b 100644 --- a/build-tools-internal/src/main/resources/changelog-schema.json +++ b/build-tools-internal/src/main/resources/changelog-schema.json @@ -80,6 +80,7 @@ "TLS", "Task Management", "Transform", + "TSDB", "Watcher" ] }, diff --git a/docs/changelog/83148.yaml b/docs/changelog/83148.yaml new file mode 100644 index 0000000000000..0823842d7ba29 --- /dev/null +++ b/docs/changelog/83148.yaml @@ -0,0 +1,5 @@ +pr: 83148 +summary: Handle `fields.with.dots` in `routing_path` +area: TSDB +type: feature +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java index 64a2a2b093468..8bf20281844f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java @@ -8,6 +8,8 @@ package org.elasticsearch.cluster.routing; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.StringHelper; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -23,7 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.function.IntConsumer; @@ -224,66 +226,78 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy } assert Transports.assertNotTransportThread("parsing the _source can get slow"); + List hashes = new ArrayList<>(); try { try (XContentParser parser = sourceType.xContent().createParser(parserConfig, source.streamInput())) { parser.nextToken(); // Move to first token if (parser.currentToken() == null) { throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields"); } - int hash = extractObject(parser); + parser.nextToken(); + extractObject(hashes, null, parser); ensureExpectedToken(null, parser.nextToken(), parser); - return hashToShardId(hash); } } catch (IOException | ParsingException e) { throw new IllegalArgumentException("Error extracting routing: " + e.getMessage(), e); } + return hashToShardId(hashesToHash(hashes)); } - private static int extractObject(XContentParser source) throws IOException { - ensureExpectedToken(Token.FIELD_NAME, source.nextToken(), source); - String firstFieldName = source.currentName(); - source.nextToken(); - int firstHash = extractItem(source); - if (source.currentToken() == Token.END_OBJECT) { - // Just one routing key in this object - // Use ^ like Map.Entry's hashcode - return Murmur3HashFunction.hash(firstFieldName) ^ firstHash; - } - List hashes = new ArrayList<>(); - hashes.add(new NameAndHash(firstFieldName, firstHash)); - do { + private static void extractObject(List hashes, @Nullable String path, XContentParser source) throws IOException { + while (source.currentToken() != Token.END_OBJECT) { ensureExpectedToken(Token.FIELD_NAME, source.currentToken(), source); String fieldName = source.currentName(); + String subPath = path == null ? fieldName : path + "." + fieldName; source.nextToken(); - hashes.add(new NameAndHash(fieldName, extractItem(source))); - } while (source.currentToken() != Token.END_OBJECT); - Collections.sort(hashes, Comparator.comparing(nameAndHash -> nameAndHash.name)); - /* - * This is the same as Arrays.hash(Map.Entry) but we're - * writing it out so for extra paranoia. Changing this will change how - * documents are routed and we don't want a jdk update that modifies Arrays - * or Map.Entry to sneak up on us. - */ - int hash = 0; - for (NameAndHash nameAndHash : hashes) { - int thisHash = Murmur3HashFunction.hash(nameAndHash.name) ^ nameAndHash.hash; - hash = 31 * hash + thisHash; + extractItem(hashes, subPath, source); } - return hash; } - private static int extractItem(XContentParser source) throws IOException { - if (source.currentToken() == Token.START_OBJECT) { - int hash = extractObject(source); - source.nextToken(); - return hash; + private static void extractItem(List hashes, String path, XContentParser source) throws IOException { + switch (source.currentToken()) { + case START_OBJECT: + source.nextToken(); + extractObject(hashes, path, source); + source.nextToken(); + break; + case VALUE_STRING: + hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text())))); + source.nextToken(); + break; + case VALUE_NULL: + source.nextToken(); + break; + default: + throw new ParsingException( + source.getTokenLocation(), + "Routing values must be strings but found [{}]", + source.currentToken() + ); } - if (source.currentToken() == Token.VALUE_STRING) { - int hash = Murmur3HashFunction.hash(source.text()); - source.nextToken(); - return hash; + } + + private static int hash(BytesRef ref) { + return StringHelper.murmurhash3_x86_32(ref, 0); + } + + private static int hashesToHash(List hashes) { + Collections.sort(hashes); + Iterator itr = hashes.iterator(); + if (itr.hasNext() == false) { + throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields"); + } + NameAndHash prev = itr.next(); + int hash = hash(prev.name) ^ prev.hash; + while (itr.hasNext()) { + NameAndHash next = itr.next(); + if (prev.name.equals(next.name)) { + throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]"); + } + int thisHash = hash(next.name) ^ next.hash; + hash = 31 * hash + thisHash; + prev = next; } - throw new ParsingException(source.getTokenLocation(), "Routing values must be strings but found [{}]", source.currentToken()); + return hash; } @Override @@ -316,5 +330,10 @@ private String error(String operation) { } } - private record NameAndHash(String name, int hash) {} + private static record NameAndHash(BytesRef name, int hash) implements Comparable { + @Override + public int compareTo(NameAndHash o) { + return name.compareTo(o.name); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java index b18c90fecc326..5509ee5cc0d4d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java @@ -7,6 +7,8 @@ */ package org.elasticsearch.cluster.routing; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.StringHelper; import org.elasticsearch.Version; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -513,7 +515,7 @@ public void testRoutingPathOneSub() throws IOException { assertIndexShard( routing, Map.of("foo", Map.of("bar", "cat"), "baz", "dog"), - Math.floorMod(hash(List.of("foo", List.of("bar", "cat"))), shards) + Math.floorMod(hash(List.of("foo.bar", "cat")), shards) ); } @@ -523,10 +525,16 @@ public void testRoutingPathManySubs() throws IOException { assertIndexShard( routing, Map.of("foo", Map.of("a", "cat"), "bar", Map.of("thing", "yay", "this", "too")), - Math.floorMod(hash(List.of("bar", List.of("thing", "yay", "this", "too"), "foo", List.of("a", "cat"))), shards) + Math.floorMod(hash(List.of("bar.thing", "yay", "bar.this", "too", "foo.a", "cat")), shards) ); } + public void testRoutingPathDotInName() throws IOException { + int shards = between(2, 1000); + IndexRouting routing = indexRoutingForPath(shards, "foo.bar"); + assertIndexShard(routing, Map.of("foo.bar", "cat", "baz", "dog"), Math.floorMod(hash(List.of("foo.bar", "cat")), shards)); + } + public void testRoutingPathBwc() throws IOException { Version version = VersionUtils.randomIndexCompatibleVersion(random()); IndexRouting routing = indexRoutingForPath(version, 8, "dim.*,other.*,top"); @@ -538,12 +546,13 @@ public void testRoutingPathBwc() throws IOException { * versions of Elasticsearch must continue to route based on the * version on the index. */ - assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 0); + assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 4); assertIndexShard(routing, Map.of("dim", Map.of("a", "b")), 5); assertIndexShard(routing, Map.of("dim", Map.of("c", "d")), 4); - assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 5); - assertIndexShard(routing, Map.of("top", "a"), 3); - assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 2); + assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 7); + assertIndexShard(routing, Map.of("top", "a"), 5); + assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 0); + assertIndexShard(routing, Map.of("dim.a", "a"), 4); } private IndexRouting indexRoutingForPath(int shards, String path) { @@ -560,8 +569,8 @@ private IndexRouting indexRoutingForPath(Version createdVersion, int shards, Str ); } - private void assertIndexShard(IndexRouting routing, Map source, int id) throws IOException { - assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(id)); + private void assertIndexShard(IndexRouting routing, Map source, int expected) throws IOException { + assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(expected)); } private BytesReference source(Map doc) throws IOException { @@ -581,24 +590,14 @@ private BytesReference source(Map doc) throws IOException { /** * Build the hash we expect from the extracter. */ - private int hash(List keysAndValues) { + private int hash(List keysAndValues) { assertThat(keysAndValues.size() % 2, equalTo(0)); int hash = 0; for (int i = 0; i < keysAndValues.size(); i += 2) { - int thisHash = Murmur3HashFunction.hash(keysAndValues.get(i).toString()) ^ expectedValueHash(keysAndValues.get(i + 1)); - hash = hash * 31 + thisHash; + int keyHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i)), 0); + int valueHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i + 1)), 0); + hash = hash * 31 + (keyHash ^ valueHash); } return hash; } - - private int expectedValueHash(Object value) { - if (value instanceof List) { - return hash((List) value); - } - if (value instanceof String) { - return Murmur3HashFunction.hash((String) value); - } - throw new IllegalArgumentException("Unsupported value: " + value); - } - }