From 36ff00ec1d90dd33b9d46378972f6b0b0007d219 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 25 May 2022 16:02:23 +0100 Subject: [PATCH 01/21] Deduplicate mappings in persisted cluster state WIP --- .../cluster/metadata/IndexMetadata.java | 24 +++- .../cluster/metadata/Metadata.java | 3 +- .../gateway/PersistedClusterStateService.java | 120 +++++++++++++++--- .../cluster/metadata/MetadataTests.java | 2 +- 4 files changed, 125 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index c67285b3f6175..63037a3d0f094 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -68,6 +68,7 @@ import java.util.function.Function; import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; +import static org.elasticsearch.cluster.metadata.Metadata.DEDUPLICATED_MAPPINGS_PARAM; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.validateIpValue; @@ -476,6 +477,7 @@ public Iterator> settings() { static final String KEY_SETTINGS = "settings"; static final String KEY_STATE = "state"; static final String KEY_MAPPINGS = "mappings"; + static final String KEY_MAPPINGS_HASH = "mappings_hash"; static final String KEY_ALIASES = "aliases"; static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; @@ -1008,7 +1010,11 @@ public static Diff readDiffFrom(StreamInput in) throws IOExceptio } public static IndexMetadata fromXContent(XContentParser parser) throws IOException { - return Builder.fromXContent(parser); + return Builder.fromXContent(parser, null); + } + + public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { + return Builder.fromXContent(parser, mappingsByHash); } @Override @@ -1738,7 +1744,12 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } builder.endObject(); - if (context != Metadata.XContentContext.API) { + if (context == Metadata.XContentContext.GATEWAY && params.paramAsBoolean(DEDUPLICATED_MAPPINGS_PARAM, false)) { + MappingMetadata mmd = indexMetadata.mapping(); + if (mmd != null) { + builder.field(KEY_MAPPINGS_HASH, mmd.source().getSha256()); + } + } else if (context != Metadata.XContentContext.API) { builder.startArray(KEY_MAPPINGS); MappingMetadata mmd = indexMetadata.mapping(); if (mmd != null) { @@ -1818,7 +1829,7 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build builder.endObject(); } - public static IndexMetadata fromXContent(XContentParser parser) throws IOException { + public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { if (parser.currentToken() == null) { // fresh parser? move to the first token parser.nextToken(); } @@ -1934,6 +1945,11 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti } case KEY_ROUTING_NUM_SHARDS -> builder.setRoutingNumShards(parser.intValue()); case KEY_SYSTEM -> builder.system(parser.booleanValue()); + case KEY_MAPPINGS_HASH -> { + assert mappingsByHash != null : "no deduplicated mappings given"; + assert mappingsByHash.containsKey(parser.text()) : "mapping with hash [" + parser.text() + "] not found"; + builder.putMapping(mappingsByHash.get(parser.text())); + } default -> throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } } else { @@ -2131,7 +2147,7 @@ public void toXContent(XContentBuilder builder, IndexMetadata state) throws IOEx @Override public IndexMetadata fromXContent(XContentParser parser) throws IOException { - return Builder.fromXContent(parser); + return Builder.fromXContent(parser, null); } }; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 6e6753b790300..05e3037d113ac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -189,6 +189,7 @@ default boolean isRestorable() { public static final String CONTEXT_MODE_API = XContentContext.API.toString(); + public static final String DEDUPLICATED_MAPPINGS_PARAM = "deduplicated_mappings"; public static final String GLOBAL_STATE_FILE_PREFIX = "global-"; private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); @@ -2194,7 +2195,7 @@ public static Metadata fromXContent(XContentParser parser) throws IOException { builder.persistentSettings(Settings.fromXContent(parser)); } else if ("indices".equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - builder.put(IndexMetadata.Builder.fromXContent(parser), false); + builder.put(IndexMetadata.Builder.fromXContent(parser, null), false); } } else if ("hashes_of_consistent_settings".equals(currentFieldName)) { builder.hashesOfConsistentSettings(parser.mapStrings()); diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 8c746098da1cd..67a6f68743ee1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -43,12 +43,14 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.Loggers; @@ -69,6 +71,7 @@ import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; @@ -84,6 +87,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -91,6 +95,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.IntPredicate; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -100,12 +105,13 @@ * to record the last-accepted cluster state during publication. The metadata is written incrementally where possible, leaving alone any * documents that have not changed. The index has the following fields: * - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ - * | "type" (string field) | "index_uuid" (string field) | "data" (stored binary field in SMILE format) | "page" | "last_page" | - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ - * | GLOBAL_TYPE_NAME == "global" | (omitted) | Global metadata | large docs are | - * | INDEX_TYPE_NAME == "index" | Index UUID | Index metadata | split into pages | - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ + * | "type" (string field) | ID (string) field | "data" (stored binary field in SMILE format) | "page" | "last_page" | + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ + * | GLOBAL_TYPE_NAME == "global" | (none) | Global metadata | large docs are | + * | INDEX_TYPE_NAME == "index" | "index_uuid" | Index metadata | split into pages | + * | MAPPING_TYPE_NAME == "mapping" | "mapping_hash" | Mapping metadata | split into pages | + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ * * Additionally each commit has the following user data: * @@ -130,8 +136,10 @@ public class PersistedClusterStateService { public static final String TYPE_FIELD_NAME = "type"; public static final String GLOBAL_TYPE_NAME = "global"; public static final String INDEX_TYPE_NAME = "index"; + public static final String MAPPING_TYPE_NAME = "mapping"; private static final String DATA_FIELD_NAME = "data"; private static final String INDEX_UUID_FIELD_NAME = "index_uuid"; + private static final String MAPPING_HASH_FIELD_NAME = "mapping_hash"; public static final String PAGE_FIELD_NAME = "page"; public static final String LAST_PAGE_FIELD_NAME = "last_page"; public static final int IS_LAST_PAGE = 1; @@ -490,7 +498,7 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw searcher.setQueryCache(null); final SetOnce builderReference = new SetOnce<>(); - consumeFromType(searcher, GLOBAL_TYPE_NAME, bytes -> { + consumeFromType(searcher, GLOBAL_TYPE_NAME, ignored -> GLOBAL_TYPE_NAME, bytes -> { final Metadata metadata = readXContent(bytes, Metadata.Builder::fromXContent); logger.trace("found global metadata with last-accepted term [{}]", metadata.coordinationMetadata().term()); if (builderReference.get() != null) { @@ -504,11 +512,51 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw throw new CorruptStateException("no global metadata found in [" + dataPath + "]"); } - logger.trace("got global metadata, now reading index metadata"); + logger.trace("got global metadata, now reading mapping metadata"); + + final Map mappingsByHash = new HashMap<>(); + consumeFromType(searcher, MAPPING_TYPE_NAME, document -> document.getField(MAPPING_HASH_FIELD_NAME).stringValue(), bytes -> { + logger.trace("found mapping metadata: {}", Base64.getEncoder().encodeToString(BytesReference.toBytes(bytes))); + final var mappingMetadata = readXContent(bytes, parser -> { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new CorruptStateException( + "invalid mapping metadata: expected START_OBJECT but got [" + parser.currentToken() + "]" + ); + } + if (parser.nextToken() != XContentParser.Token.FIELD_NAME) { + throw new CorruptStateException( + "invalid mapping metadata: expected FIELD_NAME but got [" + parser.currentToken() + "]" + ); + } + final var fieldName = parser.currentName(); + if ("content".equals(fieldName) == false) { + throw new CorruptStateException("invalid mapping metadata: unknown field [" + fieldName + "]"); + } + if (parser.nextToken() != XContentParser.Token.VALUE_EMBEDDED_OBJECT) { + throw new CorruptStateException( + "invalid mapping metadata: expected VALUE_EMBEDDED_OBJECT but got [" + parser.currentToken() + "]" + ); + } + final var binaryValue = parser.binaryValue(); + logger.trace("loading mapping: {}", Base64.getEncoder().encodeToString(binaryValue)); + return new MappingMetadata(new CompressedXContent(binaryValue)); + }); + final var hash = mappingMetadata.source().getSha256(); + logger.trace( + "found mapping metadata with hash {}: {}", + hash, + Base64.getEncoder().encodeToString(BytesReference.toBytes(bytes)) + ); + if (mappingsByHash.put(hash, mappingMetadata) != null) { + throw new CorruptStateException("duplicate metadata found for mapping hash [" + hash + "]"); + } + }); + + logger.trace("got metadata for [{}] mappings, now reading mapping metadata", mappingsByHash.size()); final Set indexUUIDs = new HashSet<>(); - consumeFromType(searcher, INDEX_TYPE_NAME, bytes -> { - final IndexMetadata indexMetadata = readXContent(bytes, IndexMetadata::fromXContent); + consumeFromType(searcher, INDEX_TYPE_NAME, document -> document.getField(INDEX_UUID_FIELD_NAME).stringValue(), bytes -> { + final IndexMetadata indexMetadata = readXContent(bytes, parser -> IndexMetadata.fromXContent(parser, mappingsByHash)); logger.trace("found index metadata for {}", indexMetadata.getIndex()); if (indexUUIDs.add(indexMetadata.getIndexUUID()) == false) { throw new CorruptStateException("duplicate metadata found for " + indexMetadata.getIndex() + " in [" + dataPath + "]"); @@ -544,6 +592,7 @@ private T readXContent(BytesReference bytes, CheckedFunction keyFunction, CheckedConsumer bytesReferenceConsumer ) throws IOException { @@ -589,13 +638,7 @@ private static void consumeFromType( // startup, on the main thread and before most other services have started, and we will need space to serialize the // whole cluster state in memory later on. - final String key; - if (type.equals(GLOBAL_TYPE_NAME)) { - key = GLOBAL_TYPE_NAME; - } else { - key = document.getField(INDEX_UUID_FIELD_NAME).stringValue(); - } - + final var key = keyFunction.apply(document); final PaginatedDocumentReader reader = documentReaders.computeIfAbsent(key, k -> new PaginatedDocumentReader()); final BytesReference bytesReference = reader.addPage(key, documentData, pageIndex, isLastPage); if (bytesReference != null) { @@ -629,6 +672,7 @@ private static BytesReference uncompress(BytesReference bytesReference) throws I Map params = Maps.newMapWithExpectedSize(2); params.put("binary", "true"); params.put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY); + params.put(Metadata.DEDUPLICATED_MAPPINGS_PARAM, Boolean.TRUE.toString()); FORMAT_PARAMS = new ToXContent.MapParams(params); } @@ -687,6 +731,11 @@ void deleteIndexMetadata(String indexUUID) throws IOException { indexWriter.deleteDocuments(new Term(INDEX_UUID_FIELD_NAME, indexUUID)); } + public void deleteMappingMetadata(String mappingHash) throws IOException { + this.logger.trace("removing mapping metadata for [{}]", mappingHash); + indexWriter.deleteDocuments(new Term(MAPPING_HASH_FIELD_NAME, mappingHash)); + } + void flush() throws IOException { this.logger.trace("flushing"); this.indexWriter.flush(); @@ -865,6 +914,19 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata addGlobalMetadataDocuments(metadata); } + final var previousMappingHashes = new HashSet<>(previouslyWrittenMetadata.getMappingsByHash().keySet()); + for (final var entry : metadata.getMappingsByHash().entrySet()) { + if (previousMappingHashes.remove(entry.getKey()) == false) { + addMappingDocuments(entry.getKey(), entry.getValue()); + } + } + + for (final var unusedMappingHash : previousMappingHashes) { + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.deleteMappingMetadata(unusedMappingHash); + } + } + final Map indexMetadataVersionByUUID = Maps.newMapWithExpectedSize(previouslyWrittenMetadata.indices().size()); for (IndexMetadata indexMetadata : previouslyWrittenMetadata.indices().values()) { final Long previousValue = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion()); @@ -922,6 +984,24 @@ private static int lastPageValue(boolean isLastPage) { return isLastPage ? IS_LAST_PAGE : IS_NOT_LAST_PAGE; } + private void addMappingDocuments(String key, MappingMetadata mappingMetadata) throws IOException { + logger.trace("writing mapping metadata with hash [{}]", key); + writePages( + (builder, params) -> builder.field("content", mappingMetadata.source().compressed()), + (((bytesRef, pageIndex, isLastPage) -> { + final Document document = new Document(); + document.add(new StringField(TYPE_FIELD_NAME, MAPPING_TYPE_NAME, Field.Store.NO)); + document.add(new StringField(MAPPING_HASH_FIELD_NAME, key, Field.Store.YES)); + document.add(new StoredField(PAGE_FIELD_NAME, pageIndex)); + document.add(new StoredField(LAST_PAGE_FIELD_NAME, lastPageValue(isLastPage))); + document.add(new StoredField(DATA_FIELD_NAME, bytesRef)); + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.indexWriter.addDocument(document); + } + })) + ); + } + private void addIndexMetadataDocuments(IndexMetadata indexMetadata) throws IOException { final String indexUUID = indexMetadata.getIndexUUID(); assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; @@ -953,7 +1033,7 @@ private void addGlobalMetadataDocuments(Metadata metadata) throws IOException { }); } - private void writePages(ToXContent metadata, PageWriter pageWriter) throws IOException { + private void writePages(ToXContentFragment metadata, PageWriter pageWriter) throws IOException { try ( PageWriterOutputStream paginatedStream = new PageWriterOutputStream(documentBuffer, pageWriter); OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(paginatedStream); @@ -981,6 +1061,10 @@ private WriterStats overwriteMetadata(Metadata metadata) throws IOException { private WriterStats addMetadata(Metadata metadata) throws IOException { addGlobalMetadataDocuments(metadata); + for (final var entry : metadata.getMappingsByHash().entrySet()) { + addMappingDocuments(entry.getKey(), entry.getValue()); + } + for (IndexMetadata indexMetadata : metadata.indices().values()) { addIndexMetadataDocuments(indexMetadata); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 1676df18ed6fc..0e3c4b4512fef 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -491,7 +491,7 @@ public void testUnknownFieldIndexMetadata() throws IOException { JsonXContent.contentBuilder().startObject().startObject("index_name").field("random", "value").endObject().endObject() ); try (XContentParser parser = createParser(JsonXContent.jsonXContent, metadata)) { - IndexMetadata.Builder.fromXContent(parser); + IndexMetadata.Builder.fromXContent(parser, null); fail(); } catch (IllegalArgumentException e) { assertEquals("Unexpected field [random]", e.getMessage()); From 213e93328f36c4667c7404b088fac4dc93b6d214 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 25 May 2022 16:10:47 +0100 Subject: [PATCH 02/21] Less noise in logs --- .../gateway/PersistedClusterStateService.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 67a6f68743ee1..c7ddafc7f205d 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -87,7 +87,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -516,7 +515,6 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw final Map mappingsByHash = new HashMap<>(); consumeFromType(searcher, MAPPING_TYPE_NAME, document -> document.getField(MAPPING_HASH_FIELD_NAME).stringValue(), bytes -> { - logger.trace("found mapping metadata: {}", Base64.getEncoder().encodeToString(BytesReference.toBytes(bytes))); final var mappingMetadata = readXContent(bytes, parser -> { if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new CorruptStateException( @@ -537,16 +535,10 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw "invalid mapping metadata: expected VALUE_EMBEDDED_OBJECT but got [" + parser.currentToken() + "]" ); } - final var binaryValue = parser.binaryValue(); - logger.trace("loading mapping: {}", Base64.getEncoder().encodeToString(binaryValue)); - return new MappingMetadata(new CompressedXContent(binaryValue)); + return new MappingMetadata(new CompressedXContent(parser.binaryValue())); }); final var hash = mappingMetadata.source().getSha256(); - logger.trace( - "found mapping metadata with hash {}: {}", - hash, - Base64.getEncoder().encodeToString(BytesReference.toBytes(bytes)) - ); + logger.trace("found mapping metadata with hash {}", hash); if (mappingsByHash.put(hash, mappingMetadata) != null) { throw new CorruptStateException("duplicate metadata found for mapping hash [" + hash + "]"); } From 2d2f135525bf44668b9daadfe0520ed91b68154a Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 28 Jun 2022 13:40:15 -0400 Subject: [PATCH 03/21] Clean up this comment table --- .../org/elasticsearch/gateway/PersistedClusterStateService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index bf052b563be5a..1205e0ad0a37d 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -112,7 +112,7 @@ * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ * | GLOBAL_TYPE_NAME == "global" | (none) | Global metadata | large docs are | * | INDEX_TYPE_NAME == "index" | "index_uuid" | Index metadata | split into pages | - * | MAPPING_TYPE_NAME == "mapping" | "mapping_hash" | Mapping metadata | split into pages | + * | MAPPING_TYPE_NAME == "mapping" | "mapping_hash" | Mapping metadata | | * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ * * Additionally each commit has the following user data: From 477f803b794db3e270fe43f172b8eb91f742e56a Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 28 Jun 2022 13:40:33 -0400 Subject: [PATCH 04/21] Fix this log message --- .../org/elasticsearch/gateway/PersistedClusterStateService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 1205e0ad0a37d..3d216d106f7fb 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -585,7 +585,7 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw } }); - logger.trace("got metadata for [{}] mappings, now reading mapping metadata", mappingsByHash.size()); + logger.trace("got metadata for [{}] mappings, now reading index metadata", mappingsByHash.size()); final Set indexUUIDs = new HashSet<>(); consumeFromType(searcher, INDEX_TYPE_NAME, document -> document.getField(INDEX_UUID_FIELD_NAME).stringValue(), bytes -> { From 5e39806674956adbcb66af8d3b53f30af1e583a6 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 28 Jun 2022 16:27:51 -0400 Subject: [PATCH 05/21] Add another fromXContent arity --- .../org/elasticsearch/cluster/metadata/IndexMetadata.java | 8 ++++++-- .../java/org/elasticsearch/cluster/metadata/Metadata.java | 2 +- .../org/elasticsearch/cluster/metadata/MetadataTests.java | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 2c8d9510abb9d..0ce0e39a21a05 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -1097,7 +1097,7 @@ public static Diff readDiffFrom(StreamInput in) throws IOExceptio } public static IndexMetadata fromXContent(XContentParser parser) throws IOException { - return Builder.fromXContent(parser, null); + return Builder.fromXContent(parser); } public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { @@ -1923,6 +1923,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build builder.endObject(); } + public static IndexMetadata fromXContent(XContentParser parser) throws IOException { + return fromXContent(parser, null); + } + public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { if (parser.currentToken() == null) { // fresh parser? move to the first token parser.nextToken(); @@ -2241,7 +2245,7 @@ public void toXContent(XContentBuilder builder, IndexMetadata state) throws IOEx @Override public IndexMetadata fromXContent(XContentParser parser) throws IOException { - return Builder.fromXContent(parser, null); + return Builder.fromXContent(parser); } }; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 249307dfa0d31..3809390019795 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -2290,7 +2290,7 @@ public static Metadata fromXContent(XContentParser parser) throws IOException { builder.persistentSettings(Settings.fromXContent(parser)); } else if ("indices".equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - builder.put(IndexMetadata.Builder.fromXContent(parser, null), false); + builder.put(IndexMetadata.Builder.fromXContent(parser), false); } } else if ("hashes_of_consistent_settings".equals(currentFieldName)) { builder.hashesOfConsistentSettings(parser.mapStrings()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 0d3f2e5c6518f..7312926f5c669 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -490,7 +490,7 @@ public void testUnknownFieldIndexMetadata() throws IOException { JsonXContent.contentBuilder().startObject().startObject("index_name").field("random", "value").endObject().endObject() ); try (XContentParser parser = createParser(JsonXContent.jsonXContent, metadata)) { - IndexMetadata.Builder.fromXContent(parser, null); + IndexMetadata.Builder.fromXContent(parser); fail(); } catch (IllegalArgumentException e) { assertEquals("Unexpected field [random]", e.getMessage()); From 2f9378d24e8d0a6f10c1178da4443a660e3ba918 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 30 Jun 2022 11:00:26 -0400 Subject: [PATCH 06/21] Use numIndicesAdded to pass the indices count for isFullWrite --- .../elasticsearch/gateway/PersistedClusterStateService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 3d216d106f7fb..5607ae363e36b 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -1108,7 +1108,7 @@ private WriterStats addMetadata(Metadata metadata) throws IOException { metadataIndexWriter.flush(); } - return new WriterStats(true, true, 0, 0, metadata.indices().size(), 0); + return new WriterStats(true, true, 0, metadata.indices().size(), 0, 0); } public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion, Version oldestIndexVersion) @@ -1214,7 +1214,7 @@ private record WriterStats( @Override public String toString() { if (isFullWrite) { - return String.format(Locale.ROOT, "wrote global metadata and metadata for [%d] indices", numIndicesUpdated); + return String.format(Locale.ROOT, "wrote global metadata and metadata for [%d] indices", numIndicesAdded); } else { return String.format( Locale.ROOT, From 6493aed3a7343014083d23b19342a5b817bfdbcf Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 30 Jun 2022 11:01:34 -0400 Subject: [PATCH 07/21] Reword this log message to reflect specifically that there's no action required for an index, specifically --- .../org/elasticsearch/gateway/PersistedClusterStateService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 5607ae363e36b..e54a4ae6bd3ac 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -992,7 +992,7 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata addIndexMetadataDocuments(indexMetadata); } else { numIndicesUnchanged++; - logger.trace("no action required for [{}]", indexMetadata.getIndex()); + logger.trace("no action required for index [{}]", indexMetadata.getIndex()); } indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID()); } From e32e52314c34dac9c900f35c36adef3676f36eb5 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 30 Jun 2022 11:02:18 -0400 Subject: [PATCH 08/21] Track mapping counts via WriterStats There's no tracking of 'mappings changed' because we're storing things by hash, so mappings are removed and/or added but we can't see 'changes' as a reified thing. --- .../gateway/PersistedClusterStateService.java | 39 +++++++++++++++++-- .../PersistedClusterStateServiceTests.java | 15 +++++-- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index e54a4ae6bd3ac..0430f31f546f7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -947,16 +947,24 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata addGlobalMetadataDocuments(metadata); } + int numMappingsAdded = 0; + int numMappingsRemoved = 0; + int numMappingsUnchanged = 0; final var previousMappingHashes = new HashSet<>(previouslyWrittenMetadata.getMappingsByHash().keySet()); for (final var entry : metadata.getMappingsByHash().entrySet()) { if (previousMappingHashes.remove(entry.getKey()) == false) { addMappingDocuments(entry.getKey(), entry.getValue()); + numMappingsAdded++; + } else { + logger.trace("no action required for mapping [{}]", entry.getKey()); + numMappingsUnchanged++; } } for (final var unusedMappingHash : previousMappingHashes) { for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { metadataIndexWriter.deleteMappingMetadata(unusedMappingHash); + numMappingsRemoved++; } } @@ -1010,7 +1018,17 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata metadataIndexWriter.flush(); } - return new WriterStats(false, updateGlobalMeta, numIndicesUnchanged, numIndicesAdded, numIndicesUpdated, numIndicesRemoved); + return new WriterStats( + false, + updateGlobalMeta, + numMappingsUnchanged, + numMappingsAdded, + numMappingsRemoved, + numIndicesUnchanged, + numIndicesAdded, + numIndicesUpdated, + numIndicesRemoved + ); } private static int lastPageValue(boolean isLastPage) { @@ -1108,7 +1126,7 @@ private WriterStats addMetadata(Metadata metadata) throws IOException { metadataIndexWriter.flush(); } - return new WriterStats(true, true, 0, metadata.indices().size(), 0, 0); + return new WriterStats(true, true, 0, metadata.getMappingsByHash().size(), 0, 0, metadata.indices().size(), 0, 0); } public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion, Version oldestIndexVersion) @@ -1206,6 +1224,9 @@ public void close() throws IOException { private record WriterStats( boolean isFullWrite, boolean globalMetaUpdated, + int numMappingsUnchanged, + int numMappingsAdded, + int numMappingsRemoved, int numIndicesUnchanged, int numIndicesAdded, int numIndicesUpdated, @@ -1214,14 +1235,24 @@ private record WriterStats( @Override public String toString() { if (isFullWrite) { - return String.format(Locale.ROOT, "wrote global metadata and metadata for [%d] indices", numIndicesAdded); + return String.format( + Locale.ROOT, + "wrote global metadata, [%d] mappings, and metadata for [%d] indices", + numMappingsAdded, + numIndicesAdded + ); } else { return String.format( Locale.ROOT, """ - [%s] global metadata, wrote metadata for [%d] new indices and [%d] existing indices, \ + [%s] global metadata, \ + wrote [%d] new mappings, removed [%d] mappings and skipped [%d] unchanged mappings, \ + wrote metadata for [%d] new indices and [%d] existing indices, \ removed metadata for [%d] indices and skipped [%d] unchanged indices""", globalMetaUpdated ? "wrote" : "skipped writing", + numMappingsAdded, + numMappingsRemoved, + numMappingsUnchanged, numIndicesAdded, numIndicesUpdated, numIndicesRemoved, diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 21ccaff0f646f..e6219f5465eba 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -1194,7 +1194,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1210,7 +1210,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1244,7 +1244,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1254,6 +1254,14 @@ public void testSlowLogging() throws IOException, IllegalAccessException { .version(clusterState.version()) .put( IndexMetadata.builder("test") + .putMapping(""" + { + "_doc": { + "properties": { + "somefield": { "type": "text" } + } + } + }""") .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1277,6 +1285,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing cluster state took [*] which is above the warn threshold of [*]; [skipped writing] global metadata, \ + wrote [1] new mappings, removed [0] mappings and skipped [0] unchanged mappings, \ wrote metadata for [1] new indices and [0] existing indices, removed metadata for [0] indices and \ skipped [0] unchanged indices""" ) From ae65a22a0a9ab8d66b92ed3c86672ba74acbd647 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 6 Jul 2022 10:16:44 -0400 Subject: [PATCH 09/21] Add MAPPING_TYPE_NAME and handle no docs (null scorer) --- .../PersistedClusterStateServiceTests.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index e6219f5465eba..6c29f75826d8d 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -87,6 +87,7 @@ import static org.elasticsearch.gateway.PersistedClusterStateService.IS_LAST_PAGE; import static org.elasticsearch.gateway.PersistedClusterStateService.IS_NOT_LAST_PAGE; import static org.elasticsearch.gateway.PersistedClusterStateService.LAST_PAGE_FIELD_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.MAPPING_TYPE_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.METADATA_DIRECTORY_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.PAGE_FIELD_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.TYPE_FIELD_NAME; @@ -1073,19 +1074,21 @@ public void testHandlesShuffledDocuments() throws IOException { commitUserData = reader.getIndexCommit().getUserData(); final IndexSearcher indexSearcher = new IndexSearcher(reader); indexSearcher.setQueryCache(null); - for (String typeName : new String[] { GLOBAL_TYPE_NAME, INDEX_TYPE_NAME }) { + for (String typeName : new String[] { GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME }) { final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { final Scorer scorer = weight.scorer(leafReaderContext); - final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); - final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (isLiveDoc.test(docIdSetIterator.docID())) { - final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); - document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - documents.add(document); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + documents.add(document); + } } } } From bdd9bddb30d4fa6fd0efda9ded85126791df77ee Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 6 Jul 2022 11:25:47 -0400 Subject: [PATCH 10/21] Fix this test The behavior of writeIncrementalStateAndCommit relies on the previousState that we pass in actually being a correct representation of the on-disk state -- this test did not respect that. Rather, it did clusterState->newClusterState twice in a row, so now we inject a null->clusterState between the two so that everything works right. Also we're doing more writes now, so I bumped the 14 constant to 16. --- .../PersistedClusterStateServiceTests.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 6c29f75826d8d..943e8d44dede5 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -1294,7 +1294,21 @@ public void testSlowLogging() throws IOException, IllegalAccessException { ) ); + // force a full write, so that the next write is an actual incremental write from clusterState->newClusterState writeDurationMillis.set(randomLongBetween(0, writeDurationMillis.get() - 1)); + assertExpectedLogs( + 1L, + null, + clusterState, + writer, + new MockLogAppender.UnseenEventExpectation( + "should not see warning below threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "*" + ) + ); + assertExpectedLogs( 1L, clusterState, @@ -1308,7 +1322,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { ) ); - assertThat(currentTime.get(), lessThan(startTimeMillis + 14 * slowWriteLoggingThresholdMillis)); // ensure no overflow + assertThat(currentTime.get(), lessThan(startTimeMillis + 16 * slowWriteLoggingThresholdMillis)); // ensure no overflow } } } From fa5acf65d9d5a3388ea6d87aba8a1378f2d85a7d Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 6 Jul 2022 15:33:54 -0400 Subject: [PATCH 11/21] Don't duplicate mappings here (and switch to constants) --- .../gateway/PersistedClusterStateServiceTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 943e8d44dede5..f57e439752f31 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -727,7 +727,8 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { Directory dupDirectory = newFSDirectory(dupPath.resolve(METADATA_DIRECTORY_NAME)) ) { try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { - indexWriter.deleteDocuments(new Term("type", "global")); // do not duplicate global metadata + indexWriter.deleteDocuments(new Term(TYPE_FIELD_NAME, GLOBAL_TYPE_NAME)); // do not duplicate global metadata + indexWriter.deleteDocuments(new Term(TYPE_FIELD_NAME, MAPPING_TYPE_NAME)); // do not duplicate mappings indexWriter.addIndexes(dupDirectory); indexWriter.commit(); } From 3f53456a523fc3de6ee5c174c73b1390ae4cb716 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 6 Jul 2022 16:47:51 -0400 Subject: [PATCH 12/21] Add mappings to existing tests to exercise this code --- .../PersistedClusterStateServiceTests.java | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index f57e439752f31..d796f3712d83f 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; @@ -61,6 +62,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.xcontent.XContentFactory; import java.io.IOError; import java.io.IOException; @@ -705,6 +707,7 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { .put( IndexMetadata.builder(indexName) .version(1L) + .putMapping(randomMappingString()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -775,6 +778,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws .put( IndexMetadata.builder("test") .version(indexMetadataVersion - 1) // -1 because it's incremented in .put() + .putMapping(randomMappingString()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -803,6 +807,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) + .putMapping(indexMetadata.mapping()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -830,6 +835,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) + .putMapping(randomMappingString()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -860,6 +866,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws ) .put( IndexMetadata.builder(indexMetadata) + .putMapping(randomMappingString()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -904,6 +911,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .coordinationMetadata(CoordinationMetadata.builder(clusterState.coordinationMetadata()).term(term).build()) .put( IndexMetadata.builder("updated") + .putMapping(randomMappingString()) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -915,6 +923,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc ) .put( IndexMetadata.builder("deleted") + .putMapping(randomMappingString()) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -952,6 +961,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .remove("deleted") .put( IndexMetadata.builder("updated") + .putMapping(randomMappingString()) .settings( Settings.builder() .put(clusterState.metadata().index("updated").getSettings()) @@ -961,6 +971,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .put( IndexMetadata.builder("added") .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() + .putMapping(randomMappingString()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) @@ -1010,6 +1021,7 @@ public void testReloadsMetadataAcrossMultipleSegments() throws IOException { .version(i + 2) .put( IndexMetadata.builder(index.getName()) + .putMapping(randomMappingString()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1043,6 +1055,7 @@ public void testHandlesShuffledDocuments() throws IOException { for (int i = between(5, 20); i >= 0; i--) { metadata.put( IndexMetadata.builder("test-" + i) + .putMapping(randomMappingString()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1258,14 +1271,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { .version(clusterState.version()) .put( IndexMetadata.builder("test") - .putMapping(""" - { - "_doc": { - "properties": { - "somefield": { "type": "text" } - } - } - }""") + .putMapping(randomMappingString()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1380,6 +1386,7 @@ public void testLimitsFileCount() throws IOException { .version(i + 2) .put( IndexMetadata.builder("index-" + i) + .putMapping(randomMappingString()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1511,6 +1518,7 @@ public void testOldestIndexVersionIsCorrectlySerialized() throws IOException { for (Version indexVersion : indexVersions) { String indexUUID = UUIDs.randomBase64UUID(random()); IndexMetadata im = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("index", lastIndexNum)) + .putMapping(randomMappingString()) .settings(settings(indexVersion).put(IndexMetadata.SETTING_INDEX_UUID, indexUUID)) .numberOfShards(1) .numberOfReplicas(1) @@ -1656,6 +1664,24 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException ); } + private String randomMappingString() { + int i = randomIntBetween(0, 3); + try { + return Strings.toString( + XContentFactory.jsonBuilder() + .startObject() + .startObject("_doc") + .startObject("properties") + .field("field" + i, "text") + .endObject() + .endObject() + .endObject() + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException { final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false); return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata); From b23005771da5ccf2cdeb2403e9e13ba0fede7ce1 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 8 Jul 2022 10:34:09 -0400 Subject: [PATCH 13/21] Add a file-counting mapping deduplication test --- .../PersistedClusterStateServiceTests.java | 126 +++++++++++++++++- 1 file changed, 125 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index d796f3712d83f..4846a4dc5b4f2 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -76,6 +76,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.IntPredicate; @@ -1604,6 +1605,129 @@ public void testDebugLogging() throws IOException, IllegalAccessException { } } + public void testDeduplicatedMappings() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + try (Writer writer = persistedClusterStateService.createWriter()) { + + Set hashes; + Metadata.Builder metadata; + ClusterState clusterState; + ClusterState previousState; + + // generate two mappings + String mapping1 = randomMappingString(); + String mapping2 = randomValueOtherThan(mapping1, PersistedClusterStateServiceTests::randomMappingString); + + // build and write a cluster state with metadata that has all indices using a single mapping + metadata = Metadata.builder(); + for (int i = between(5, 20); i >= 0; i--) { + metadata.put( + IndexMetadata.builder("test-" + i) + .putMapping(mapping1) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ); + } + clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(1)); + writer.writeFullStateAndCommit(0L, clusterState); + + // verify that the on-disk state reflects 1 mapping + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(1)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + + previousState = clusterState; + metadata = Metadata.builder(previousState.metadata()); + + // add a second mapping -- either by adding a new index or changing an existing one + if (randomBoolean()) { + // add another index with a different mapping + metadata.put( + IndexMetadata.builder("test-" + 99) + .putMapping(mapping2) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ); + } else { + // change an existing index to a different mapping + String index = randomFrom(previousState.metadata().getIndices().keySet()); + metadata.put(IndexMetadata.builder(metadata.get(index)).putMapping(mapping2)); + } + clusterState = ClusterState.builder(previousState).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(2)); + writer.writeIncrementalStateAndCommit(0L, previousState, clusterState); + + // verify that the on-disk state reflects 2 mappings + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(2)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + + previousState = clusterState; + metadata = Metadata.builder(previousState.metadata()); + + // update all indices to use the second mapping + for (String index : previousState.metadata().getIndices().keySet()) { + metadata.put(IndexMetadata.builder(metadata.get(index)).putMapping(mapping2)); + } + clusterState = ClusterState.builder(previousState).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(1)); + writer.writeIncrementalStateAndCommit(0L, previousState, clusterState); + + // verify that the on-disk reflects 1 mapping + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(1)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + } + } + } + + /** + * Search the underlying persisted state indices for non-deleted mapping_hash documents that represent the + * first page of data, collecting and returning the distinct mapping_hashes themselves. + */ + private Set loadPersistedMappingHashes(Path metadataDirectory) throws IOException { + Set hashes = new HashSet<>(); + try (Directory directory = new NIOFSDirectory(metadataDirectory); DirectoryReader reader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, MAPPING_TYPE_NAME)); + final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { + final Scorer scorer = weight.scorer(leafReaderContext); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + int page = document.getField("page").numericValue().intValue(); + if (page == 0) { + String hash = document.getField("mapping_hash").stringValue(); + hashes.add(hash); + } + } + } + } + } + } + return hashes; + } + private boolean findSegmentInDirectory(Path dataPath) throws IOException { Directory d = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); @@ -1664,7 +1788,7 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException ); } - private String randomMappingString() { + private static String randomMappingString() { int i = randomIntBetween(0, 3); try { return Strings.toString( From a2638ae953a29f7a4b53fc94eb439b9ed4b65b88 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 11 Jul 2022 13:25:27 -0400 Subject: [PATCH 14/21] Quick addendums to previous commit --- .../gateway/PersistedClusterStateServiceTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 4846a4dc5b4f2..b638550952298 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -1698,7 +1698,7 @@ public void testDeduplicatedMappings() throws IOException { * Search the underlying persisted state indices for non-deleted mapping_hash documents that represent the * first page of data, collecting and returning the distinct mapping_hashes themselves. */ - private Set loadPersistedMappingHashes(Path metadataDirectory) throws IOException { + private static Set loadPersistedMappingHashes(Path metadataDirectory) throws IOException { Set hashes = new HashSet<>(); try (Directory directory = new NIOFSDirectory(metadataDirectory); DirectoryReader reader = DirectoryReader.open(directory)) { final IndexSearcher indexSearcher = new IndexSearcher(reader); @@ -1718,7 +1718,7 @@ private Set loadPersistedMappingHashes(Path metadataDirectory) throws IO int page = document.getField("page").numericValue().intValue(); if (page == 0) { String hash = document.getField("mapping_hash").stringValue(); - hashes.add(hash); + assertTrue(hashes.add(hash)); } } } From b8d311a44cb7b3474dd6792908fdd6763f27ff94 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 12 Jul 2022 10:36:12 -0400 Subject: [PATCH 15/21] Allow null mappings sometimes --- .../PersistedClusterStateServiceTests.java | 60 +++++++++---------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index b638550952298..9b45fb0e2a25d 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -41,11 +41,11 @@ import org.elasticsearch.cluster.coordination.CoordinationMetadata; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; @@ -58,11 +58,11 @@ import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xcontent.XContentFactory; import java.io.IOError; import java.io.IOException; @@ -708,7 +708,7 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { .put( IndexMetadata.builder(indexName) .version(1L) - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -779,7 +779,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws .put( IndexMetadata.builder("test") .version(indexMetadataVersion - 1) // -1 because it's incremented in .put() - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -836,7 +836,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -867,7 +867,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws ) .put( IndexMetadata.builder(indexMetadata) - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -912,7 +912,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .coordinationMetadata(CoordinationMetadata.builder(clusterState.coordinationMetadata()).term(term).build()) .put( IndexMetadata.builder("updated") - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -924,7 +924,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc ) .put( IndexMetadata.builder("deleted") - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -962,7 +962,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .remove("deleted") .put( IndexMetadata.builder("updated") - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(clusterState.metadata().index("updated").getSettings()) @@ -972,7 +972,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .put( IndexMetadata.builder("added") .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) @@ -1022,7 +1022,7 @@ public void testReloadsMetadataAcrossMultipleSegments() throws IOException { .version(i + 2) .put( IndexMetadata.builder(index.getName()) - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1056,7 +1056,7 @@ public void testHandlesShuffledDocuments() throws IOException { for (int i = between(5, 20); i >= 0; i--) { metadata.put( IndexMetadata.builder("test-" + i) - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1137,7 +1137,11 @@ public void testHandlesShuffledDocuments() throws IOException { final boolean isOnlyPageForIndex = corruptDocument.getField(TYPE_FIELD_NAME).stringValue().equals(INDEX_TYPE_NAME) && corruptDocPage == 0 && corruptDocIsLastPage; + final boolean isOnlyPageForMapping = corruptDocument.getField(TYPE_FIELD_NAME).stringValue().equals(MAPPING_TYPE_NAME) + && corruptDocPage == 0 + && corruptDocIsLastPage; if (isOnlyPageForIndex == false // don't remove the only doc for an index, this just loses the index and doesn't corrupt + && isOnlyPageForMapping == false // asdflkjasdflkjasdlkj && rarely()) { documents.remove(corruptIndex); } else { @@ -1272,7 +1276,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { .version(clusterState.version()) .put( IndexMetadata.builder("test") - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(false)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1387,7 +1391,7 @@ public void testLimitsFileCount() throws IOException { .version(i + 2) .put( IndexMetadata.builder("index-" + i) - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1519,7 +1523,7 @@ public void testOldestIndexVersionIsCorrectlySerialized() throws IOException { for (Version indexVersion : indexVersions) { String indexUUID = UUIDs.randomBase64UUID(random()); IndexMetadata im = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("index", lastIndexNum)) - .putMapping(randomMappingString()) + .putMapping(randomMappingMetadata(true)) .settings(settings(indexVersion).put(IndexMetadata.SETTING_INDEX_UUID, indexUUID)) .numberOfShards(1) .numberOfReplicas(1) @@ -1617,8 +1621,8 @@ public void testDeduplicatedMappings() throws IOException { ClusterState previousState; // generate two mappings - String mapping1 = randomMappingString(); - String mapping2 = randomValueOtherThan(mapping1, PersistedClusterStateServiceTests::randomMappingString); + MappingMetadata mapping1 = randomMappingMetadata(false); + MappingMetadata mapping2 = randomValueOtherThan(mapping1, () -> randomMappingMetadata(false)); // build and write a cluster state with metadata that has all indices using a single mapping metadata = Metadata.builder(); @@ -1788,21 +1792,15 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException ); } - private static String randomMappingString() { - int i = randomIntBetween(0, 3); - try { - return Strings.toString( - XContentFactory.jsonBuilder() - .startObject() - .startObject("_doc") - .startObject("properties") - .field("field" + i, "text") - .endObject() - .endObject() - .endObject() + private static MappingMetadata randomMappingMetadata(boolean allowNull) { + int i = randomIntBetween(0, 4); + if (i == 0 && allowNull) { + return null; + } else { + return new MappingMetadata( + MapperService.SINGLE_MAPPING_NAME, + Map.of("_doc", Map.of("properties", Map.of("field" + i, "text"))) ); - } catch (IOException e) { - throw new RuntimeException(e); } } From 03c9781b09e844fc47c4b3eef1da9d428d6ef42f Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 12 Jul 2022 16:34:13 -0400 Subject: [PATCH 16/21] Fix a placeholder comment :D --- .../gateway/PersistedClusterStateServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 9b45fb0e2a25d..6b0ee471b4da3 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -1141,7 +1141,7 @@ public void testHandlesShuffledDocuments() throws IOException { && corruptDocPage == 0 && corruptDocIsLastPage; if (isOnlyPageForIndex == false // don't remove the only doc for an index, this just loses the index and doesn't corrupt - && isOnlyPageForMapping == false // asdflkjasdflkjasdlkj + && isOnlyPageForMapping == false // similarly, don't remove the only doc for a mapping, this causes an AssertionError && rarely()) { documents.remove(corruptIndex); } else { From dc68507c59d5028c1f1ad3177a0afe55fd5139be Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 13 Jul 2022 13:40:28 -0400 Subject: [PATCH 17/21] Add tests for duplicated or missing mappings --- .../PersistedClusterStateServiceTests.java | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 6b0ee471b4da3..ee4af579589a2 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -19,6 +19,7 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.Term; import org.apache.lucene.search.DocIdSetIterator; @@ -1609,6 +1610,176 @@ public void testDebugLogging() throws IOException, IllegalAccessException { } } + public void testFailsIfMappingIsDuplicated() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put( + IndexMetadata.builder("test-1") + .putMapping(randomMappingMetadata(false)) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ) + ) + .build(); + + String hash = clusterState.metadata().getMappingsByHash().keySet().iterator().next(); + + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(0L, clusterState); + } + + final List documents = new ArrayList<>(); + final Map commitUserData; + + try ( + Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); + DirectoryReader reader = DirectoryReader.open(directory) + ) { + commitUserData = reader.getIndexCommit().getUserData(); + final IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + for (String typeName : new String[] { GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME }) { + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); + final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { + final Scorer scorer = weight.scorer(leafReaderContext); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + documents.add(document); + } + } + } + } + } + } + + // duplicate all documents associated with the mapping in question + for (Document document : new ArrayList<>(documents)) { // iterating a copy + IndexableField mappingHash = document.getField("mapping_hash"); + if (mappingHash != null && mappingHash.stringValue().equals(hash)) { + documents.add(document); + } + } + + try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + for (Document document : documents) { + indexWriter.addDocument(document); + } + indexWriter.setLiveCommitData(commitUserData.entrySet()); + indexWriter.commit(); + } + } + + final String message = expectThrows(CorruptStateException.class, () -> persistedClusterStateService.loadBestOnDiskState()) + .getMessage(); + assertEquals("duplicate metadata found for mapping hash [" + hash + "]", message); + } + } + + public void testFailsIfMappingIsMissing() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put( + IndexMetadata.builder("test-1") + .putMapping(randomMappingMetadata(false)) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ) + ) + .build(); + + String hash = clusterState.metadata().getMappingsByHash().keySet().iterator().next(); + + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(0L, clusterState); + } + + final List documents = new ArrayList<>(); + final Map commitUserData; + + try ( + Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); + DirectoryReader reader = DirectoryReader.open(directory) + ) { + commitUserData = reader.getIndexCommit().getUserData(); + final IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + for (String typeName : new String[] { GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME }) { + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); + final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { + final Scorer scorer = weight.scorer(leafReaderContext); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + documents.add(document); + } + } + } + } + } + } + + // remove all documents associated with the mapping in question + for (Document document : new ArrayList<>(documents)) { // iterating a copy + IndexableField mappingHash = document.getField("mapping_hash"); + if (mappingHash != null && mappingHash.stringValue().equals(hash)) { + documents.remove(document); + } + } + + try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + for (Document document : documents) { + indexWriter.addDocument(document); + } + indexWriter.setLiveCommitData(commitUserData.entrySet()); + indexWriter.commit(); + } + } + + final String message = expectThrows(AssertionError.class, () -> persistedClusterStateService.loadBestOnDiskState()) + .getMessage(); + assertEquals("mapping with hash [" + hash + "] not found", message); + } + } + public void testDeduplicatedMappings() throws IOException { final Path dataPath = createTempDir(); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { From 197c0e4d486d465ae8f3c58bb7474f6cdabf9b07 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 13 Jul 2022 14:49:21 -0400 Subject: [PATCH 18/21] Extract two utility methods to cut some repetition --- .../PersistedClusterStateServiceTests.java | 172 ++++++------------ 1 file changed, 51 insertions(+), 121 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index ee4af579589a2..db7ea0ae62237 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -80,6 +80,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.IntPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1088,42 +1089,12 @@ public void testHandlesShuffledDocuments() throws IOException { DirectoryReader reader = DirectoryReader.open(directory) ) { commitUserData = reader.getIndexCommit().getUserData(); - final IndexSearcher indexSearcher = new IndexSearcher(reader); - indexSearcher.setQueryCache(null); - for (String typeName : new String[] { GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME }) { - final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); - final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); - for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { - final Scorer scorer = weight.scorer(leafReaderContext); - if (scorer != null) { - final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); - final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (isLiveDoc.test(docIdSetIterator.docID())) { - final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); - document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - documents.add(document); - } - } - } - } - } + forEachDocument(reader, Set.of(GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME), documents::add); } Randomness.shuffle(documents); - try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); - indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - for (Document document : documents) { - indexWriter.addDocument(document); - } - indexWriter.setLiveCommitData(commitUserData.entrySet()); - indexWriter.commit(); - } - } + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); final ClusterState loadedState = loadPersistedClusterState(persistedClusterStateService); assertEquals(clusterState.metadata().indices(), loadedState.metadata().indices()); @@ -1157,17 +1128,7 @@ && rarely()) { } } - try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); - indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - for (Document document : documents) { - indexWriter.addDocument(document); - } - indexWriter.setLiveCommitData(commitUserData.entrySet()); - indexWriter.commit(); - } - } + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); expectThrows(CorruptStateException.class, () -> loadPersistedClusterState(persistedClusterStateService)); } @@ -1646,27 +1607,7 @@ public void testFailsIfMappingIsDuplicated() throws IOException { DirectoryReader reader = DirectoryReader.open(directory) ) { commitUserData = reader.getIndexCommit().getUserData(); - final IndexSearcher indexSearcher = new IndexSearcher(reader); - indexSearcher.setQueryCache(null); - for (String typeName : new String[] { GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME }) { - final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); - final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); - for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { - final Scorer scorer = weight.scorer(leafReaderContext); - if (scorer != null) { - final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); - final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (isLiveDoc.test(docIdSetIterator.docID())) { - final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); - document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - documents.add(document); - } - } - } - } - } + forEachDocument(reader, Set.of(GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME), documents::add); } // duplicate all documents associated with the mapping in question @@ -1677,17 +1618,7 @@ public void testFailsIfMappingIsDuplicated() throws IOException { } } - try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); - indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - for (Document document : documents) { - indexWriter.addDocument(document); - } - indexWriter.setLiveCommitData(commitUserData.entrySet()); - indexWriter.commit(); - } - } + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); final String message = expectThrows(CorruptStateException.class, () -> persistedClusterStateService.loadBestOnDiskState()) .getMessage(); @@ -1731,27 +1662,7 @@ public void testFailsIfMappingIsMissing() throws IOException { DirectoryReader reader = DirectoryReader.open(directory) ) { commitUserData = reader.getIndexCommit().getUserData(); - final IndexSearcher indexSearcher = new IndexSearcher(reader); - indexSearcher.setQueryCache(null); - for (String typeName : new String[] { GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME }) { - final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); - final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); - for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { - final Scorer scorer = weight.scorer(leafReaderContext); - if (scorer != null) { - final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); - final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (isLiveDoc.test(docIdSetIterator.docID())) { - final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); - document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - documents.add(document); - } - } - } - } - } + forEachDocument(reader, Set.of(GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME), documents::add); } // remove all documents associated with the mapping in question @@ -1762,17 +1673,7 @@ public void testFailsIfMappingIsMissing() throws IOException { } } - try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); - indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - for (Document document : documents) { - indexWriter.addDocument(document); - } - indexWriter.setLiveCommitData(commitUserData.entrySet()); - indexWriter.commit(); - } - } + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); final String message = expectThrows(AssertionError.class, () -> persistedClusterStateService.loadBestOnDiskState()) .getMessage(); @@ -1870,16 +1771,13 @@ public void testDeduplicatedMappings() throws IOException { } /** - * Search the underlying persisted state indices for non-deleted mapping_hash documents that represent the - * first page of data, collecting and returning the distinct mapping_hashes themselves. + * Utility method for applying a consumer to each document (of the given types) associated with a DirectoryReader. */ - private static Set loadPersistedMappingHashes(Path metadataDirectory) throws IOException { - Set hashes = new HashSet<>(); - try (Directory directory = new NIOFSDirectory(metadataDirectory); DirectoryReader reader = DirectoryReader.open(directory)) { - final IndexSearcher indexSearcher = new IndexSearcher(reader); - indexSearcher.setQueryCache(null); - - final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, MAPPING_TYPE_NAME)); + private static void forEachDocument(DirectoryReader reader, Set types, Consumer consumer) throws IOException { + final IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + for (String typeName : types) { + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { final Scorer scorer = weight.scorer(leafReaderContext); @@ -1890,16 +1788,48 @@ private static Set loadPersistedMappingHashes(Path metadataDirectory) th while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { if (isLiveDoc.test(docIdSetIterator.docID())) { final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); - int page = document.getField("page").numericValue().intValue(); - if (page == 0) { - String hash = document.getField("mapping_hash").stringValue(); - assertTrue(hashes.add(hash)); - } + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + consumer.accept(document); } } } } } + } + + /** + * Utility method writing documents back to a directory. + */ + private static void writeDocumentsAndCommit(Path metadataDirectory, Map commitUserData, List documents) + throws IOException { + try (Directory directory = new NIOFSDirectory(metadataDirectory)) { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + for (Document document : documents) { + indexWriter.addDocument(document); + } + indexWriter.setLiveCommitData(commitUserData.entrySet()); + indexWriter.commit(); + } + } + } + + /** + * Search the underlying persisted state indices for non-deleted mapping_hash documents that represent the + * first page of data, collecting and returning the distinct mapping_hashes themselves. + */ + private static Set loadPersistedMappingHashes(Path metadataDirectory) throws IOException { + Set hashes = new HashSet<>(); + try (Directory directory = new NIOFSDirectory(metadataDirectory); DirectoryReader reader = DirectoryReader.open(directory)) { + forEachDocument(reader, Set.of(MAPPING_TYPE_NAME), document -> { + int page = document.getField("page").numericValue().intValue(); + if (page == 0) { + String hash = document.getField("mapping_hash").stringValue(); + assertTrue(hashes.add(hash)); + } + }); + } return hashes; } From 29e79fe877333a8660d462321ef57dd31e4835c3 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 13 Jul 2022 16:20:17 -0400 Subject: [PATCH 19/21] Switch to an exception rather than an assertion for CorruptStateException parity --- .../org/elasticsearch/cluster/metadata/IndexMetadata.java | 4 +++- .../gateway/PersistedClusterStateService.java | 8 +++++++- .../gateway/PersistedClusterStateServiceTests.java | 5 +++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 4fab81af92ad7..c8337e40287de 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -2047,7 +2047,9 @@ public static IndexMetadata fromXContent(XContentParser parser, Map builder.system(parser.booleanValue()); case KEY_MAPPINGS_HASH -> { assert mappingsByHash != null : "no deduplicated mappings given"; - assert mappingsByHash.containsKey(parser.text()) : "mapping with hash [" + parser.text() + "] not found"; + if (mappingsByHash.containsKey(parser.text()) == false) { + throw new IllegalArgumentException("mapping with hash [" + parser.text() + "] not found"); + } builder.putMapping(mappingsByHash.get(parser.text())); } default -> throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 0430f31f546f7..52d0215556ba4 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -589,7 +589,13 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw final Set indexUUIDs = new HashSet<>(); consumeFromType(searcher, INDEX_TYPE_NAME, document -> document.getField(INDEX_UUID_FIELD_NAME).stringValue(), bytes -> { - final IndexMetadata indexMetadata = readXContent(bytes, parser -> IndexMetadata.fromXContent(parser, mappingsByHash)); + final IndexMetadata indexMetadata = readXContent(bytes, parser -> { + try { + return IndexMetadata.fromXContent(parser, mappingsByHash); + } catch (Exception e) { + throw new CorruptStateException(e); + } + }); logger.trace("found index metadata for {}", indexMetadata.getIndex()); if (indexUUIDs.add(indexMetadata.getIndexUUID()) == false) { throw new CorruptStateException("duplicate metadata found for " + indexMetadata.getIndex() + " in [" + dataPath + "]"); diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index db7ea0ae62237..b3d347b877695 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -1675,9 +1675,10 @@ public void testFailsIfMappingIsMissing() throws IOException { writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); - final String message = expectThrows(AssertionError.class, () -> persistedClusterStateService.loadBestOnDiskState()) + final String message = expectThrows(CorruptStateException.class, () -> persistedClusterStateService.loadBestOnDiskState()) + .getCause() .getMessage(); - assertEquals("mapping with hash [" + hash + "] not found", message); + assertEquals("java.lang.IllegalArgumentException: mapping with hash [" + hash + "] not found", message); } } From 3c6c73d8f81677758a8a81e89dcc1f7a32258fb8 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 13 Jul 2022 17:20:59 -0400 Subject: [PATCH 20/21] Update docs/changelog/88479.yaml --- docs/changelog/88479.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/88479.yaml diff --git a/docs/changelog/88479.yaml b/docs/changelog/88479.yaml new file mode 100644 index 0000000000000..5febaf0ab1232 --- /dev/null +++ b/docs/changelog/88479.yaml @@ -0,0 +1,5 @@ +pr: 88479 +summary: Deduplicate mappings in persisted cluster state +area: Cluster Coordination +type: enhancement +issues: [] From bd4b402a64baa58fccd4e6eb55d18d733928ac24 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 18 Jul 2022 13:36:59 -0400 Subject: [PATCH 21/21] Rework null handling of random mapping metadata --- .../PersistedClusterStateServiceTests.java | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index b3d347b877695..f09257922aee1 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -710,7 +710,7 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { .put( IndexMetadata.builder(indexName) .version(1L) - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -781,7 +781,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws .put( IndexMetadata.builder("test") .version(indexMetadataVersion - 1) // -1 because it's incremented in .put() - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -838,7 +838,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -869,7 +869,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws ) .put( IndexMetadata.builder(indexMetadata) - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -914,7 +914,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .coordinationMetadata(CoordinationMetadata.builder(clusterState.coordinationMetadata()).term(term).build()) .put( IndexMetadata.builder("updated") - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -926,7 +926,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc ) .put( IndexMetadata.builder("deleted") - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -964,7 +964,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .remove("deleted") .put( IndexMetadata.builder("updated") - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(clusterState.metadata().index("updated").getSettings()) @@ -974,7 +974,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .put( IndexMetadata.builder("added") .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) @@ -1024,7 +1024,7 @@ public void testReloadsMetadataAcrossMultipleSegments() throws IOException { .version(i + 2) .put( IndexMetadata.builder(index.getName()) - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1058,7 +1058,7 @@ public void testHandlesShuffledDocuments() throws IOException { for (int i = between(5, 20); i >= 0; i--) { metadata.put( IndexMetadata.builder("test-" + i) - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1238,7 +1238,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { .version(clusterState.version()) .put( IndexMetadata.builder("test") - .putMapping(randomMappingMetadata(false)) + .putMapping(randomMappingMetadata()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1353,7 +1353,7 @@ public void testLimitsFileCount() throws IOException { .version(i + 2) .put( IndexMetadata.builder("index-" + i) - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1485,7 +1485,7 @@ public void testOldestIndexVersionIsCorrectlySerialized() throws IOException { for (Version indexVersion : indexVersions) { String indexUUID = UUIDs.randomBase64UUID(random()); IndexMetadata im = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("index", lastIndexNum)) - .putMapping(randomMappingMetadata(true)) + .putMapping(randomMappingMetadataOrNull()) .settings(settings(indexVersion).put(IndexMetadata.SETTING_INDEX_UUID, indexUUID)) .numberOfShards(1) .numberOfReplicas(1) @@ -1581,7 +1581,7 @@ public void testFailsIfMappingIsDuplicated() throws IOException { Metadata.builder() .put( IndexMetadata.builder("test-1") - .putMapping(randomMappingMetadata(false)) + .putMapping(randomMappingMetadata()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1636,7 +1636,7 @@ public void testFailsIfMappingIsMissing() throws IOException { Metadata.builder() .put( IndexMetadata.builder("test-1") - .putMapping(randomMappingMetadata(false)) + .putMapping(randomMappingMetadata()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1694,8 +1694,8 @@ public void testDeduplicatedMappings() throws IOException { ClusterState previousState; // generate two mappings - MappingMetadata mapping1 = randomMappingMetadata(false); - MappingMetadata mapping2 = randomValueOtherThan(mapping1, () -> randomMappingMetadata(false)); + MappingMetadata mapping1 = randomMappingMetadata(); + MappingMetadata mapping2 = randomValueOtherThan(mapping1, () -> randomMappingMetadata()); // build and write a cluster state with metadata that has all indices using a single mapping metadata = Metadata.builder(); @@ -1894,15 +1894,17 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException ); } - private static MappingMetadata randomMappingMetadata(boolean allowNull) { + private static MappingMetadata randomMappingMetadata() { + int i = randomIntBetween(1, 4); + return new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, Map.of("_doc", Map.of("properties", Map.of("field" + i, "text")))); + } + + private static MappingMetadata randomMappingMetadataOrNull() { int i = randomIntBetween(0, 4); - if (i == 0 && allowNull) { + if (i == 0) { return null; } else { - return new MappingMetadata( - MapperService.SINGLE_MAPPING_NAME, - Map.of("_doc", Map.of("properties", Map.of("field" + i, "text"))) - ); + return randomMappingMetadata(); } }