diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 8facb6c5980bf..4fab81af92ad7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -72,6 +72,7 @@ import java.util.function.Function; import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; +import static org.elasticsearch.cluster.metadata.Metadata.DEDUPLICATED_MAPPINGS_PARAM; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.validateIpValue; @@ -480,6 +481,7 @@ public Iterator> settings() { static final String KEY_SETTINGS = "settings"; static final String KEY_STATE = "state"; static final String KEY_MAPPINGS = "mappings"; + static final String KEY_MAPPINGS_HASH = "mappings_hash"; static final String KEY_ALIASES = "aliases"; static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; @@ -1100,6 +1102,10 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti return Builder.fromXContent(parser); } + public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { + return Builder.fromXContent(parser, mappingsByHash); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { Builder.toXContent(this, builder, params); @@ -1834,7 +1840,12 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } builder.endObject(); - if (context != Metadata.XContentContext.API) { + if (context == Metadata.XContentContext.GATEWAY && params.paramAsBoolean(DEDUPLICATED_MAPPINGS_PARAM, false)) { + MappingMetadata mmd = indexMetadata.mapping(); + if (mmd != null) { + builder.field(KEY_MAPPINGS_HASH, mmd.source().getSha256()); + } + } else if (context != Metadata.XContentContext.API) { builder.startArray(KEY_MAPPINGS); MappingMetadata mmd = indexMetadata.mapping(); if (mmd != null) { @@ -1915,6 +1926,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } public static IndexMetadata fromXContent(XContentParser parser) throws IOException { + return fromXContent(parser, null); + } + + public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { if (parser.currentToken() == null) { // fresh parser? move to the first token parser.nextToken(); } @@ -2030,6 +2045,11 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti } case KEY_ROUTING_NUM_SHARDS -> builder.setRoutingNumShards(parser.intValue()); case KEY_SYSTEM -> builder.system(parser.booleanValue()); + case KEY_MAPPINGS_HASH -> { + assert mappingsByHash != null : "no deduplicated mappings given"; + assert mappingsByHash.containsKey(parser.text()) : "mapping with hash [" + parser.text() + "] not found"; + builder.putMapping(mappingsByHash.get(parser.text())); + } default -> throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } } else { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index b7dea61814dc5..3690cba2b4117 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -191,6 +191,7 @@ default boolean isRestorable() { public static final String CONTEXT_MODE_API = XContentContext.API.toString(); + public static final String DEDUPLICATED_MAPPINGS_PARAM = "deduplicated_mappings"; public static final String GLOBAL_STATE_FILE_PREFIX = "global-"; private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index e743de41950f5..0430f31f546f7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -43,12 +43,14 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.Loggers; @@ -69,6 +71,7 @@ import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; @@ -92,6 +95,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.IntPredicate; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -103,12 +107,13 @@ * to record the last-accepted cluster state during publication. The metadata is written incrementally where possible, leaving alone any * documents that have not changed. The index has the following fields: * - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ - * | "type" (string field) | "index_uuid" (string field) | "data" (stored binary field in SMILE format) | "page" | "last_page" | - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ - * | GLOBAL_TYPE_NAME == "global" | (omitted) | Global metadata | large docs are | - * | INDEX_TYPE_NAME == "index" | Index UUID | Index metadata | split into pages | - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ + * | "type" (string field) | ID (string) field | "data" (stored binary field in SMILE format) | "page" | "last_page" | + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ + * | GLOBAL_TYPE_NAME == "global" | (none) | Global metadata | large docs are | + * | INDEX_TYPE_NAME == "index" | "index_uuid" | Index metadata | split into pages | + * | MAPPING_TYPE_NAME == "mapping" | "mapping_hash" | Mapping metadata | | + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ * * Additionally each commit has the following user data: * @@ -133,8 +138,10 @@ public class PersistedClusterStateService { public static final String TYPE_FIELD_NAME = "type"; public static final String GLOBAL_TYPE_NAME = "global"; public static final String INDEX_TYPE_NAME = "index"; + public static final String MAPPING_TYPE_NAME = "mapping"; private static final String DATA_FIELD_NAME = "data"; private static final String INDEX_UUID_FIELD_NAME = "index_uuid"; + private static final String MAPPING_HASH_FIELD_NAME = "mapping_hash"; public static final String PAGE_FIELD_NAME = "page"; public static final String LAST_PAGE_FIELD_NAME = "last_page"; public static final int IS_LAST_PAGE = 1; @@ -531,7 +538,7 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw searcher.setQueryCache(null); final SetOnce builderReference = new SetOnce<>(); - consumeFromType(searcher, GLOBAL_TYPE_NAME, bytes -> { + consumeFromType(searcher, GLOBAL_TYPE_NAME, ignored -> GLOBAL_TYPE_NAME, bytes -> { final Metadata metadata = readXContent(bytes, Metadata.Builder::fromXContent); logger.trace("found global metadata with last-accepted term [{}]", metadata.coordinationMetadata().term()); if (builderReference.get() != null) { @@ -545,11 +552,44 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw throw new CorruptStateException("no global metadata found in [" + dataPath + "]"); } - logger.trace("got global metadata, now reading index metadata"); + logger.trace("got global metadata, now reading mapping metadata"); + + final Map mappingsByHash = new HashMap<>(); + consumeFromType(searcher, MAPPING_TYPE_NAME, document -> document.getField(MAPPING_HASH_FIELD_NAME).stringValue(), bytes -> { + final var mappingMetadata = readXContent(bytes, parser -> { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new CorruptStateException( + "invalid mapping metadata: expected START_OBJECT but got [" + parser.currentToken() + "]" + ); + } + if (parser.nextToken() != XContentParser.Token.FIELD_NAME) { + throw new CorruptStateException( + "invalid mapping metadata: expected FIELD_NAME but got [" + parser.currentToken() + "]" + ); + } + final var fieldName = parser.currentName(); + if ("content".equals(fieldName) == false) { + throw new CorruptStateException("invalid mapping metadata: unknown field [" + fieldName + "]"); + } + if (parser.nextToken() != XContentParser.Token.VALUE_EMBEDDED_OBJECT) { + throw new CorruptStateException( + "invalid mapping metadata: expected VALUE_EMBEDDED_OBJECT but got [" + parser.currentToken() + "]" + ); + } + return new MappingMetadata(new CompressedXContent(parser.binaryValue())); + }); + final var hash = mappingMetadata.source().getSha256(); + logger.trace("found mapping metadata with hash {}", hash); + if (mappingsByHash.put(hash, mappingMetadata) != null) { + throw new CorruptStateException("duplicate metadata found for mapping hash [" + hash + "]"); + } + }); + + logger.trace("got metadata for [{}] mappings, now reading index metadata", mappingsByHash.size()); final Set indexUUIDs = new HashSet<>(); - consumeFromType(searcher, INDEX_TYPE_NAME, bytes -> { - final IndexMetadata indexMetadata = readXContent(bytes, IndexMetadata::fromXContent); + consumeFromType(searcher, INDEX_TYPE_NAME, document -> document.getField(INDEX_UUID_FIELD_NAME).stringValue(), bytes -> { + final IndexMetadata indexMetadata = readXContent(bytes, parser -> IndexMetadata.fromXContent(parser, mappingsByHash)); logger.trace("found index metadata for {}", indexMetadata.getIndex()); if (indexUUIDs.add(indexMetadata.getIndexUUID()) == false) { throw new CorruptStateException("duplicate metadata found for " + indexMetadata.getIndex() + " in [" + dataPath + "]"); @@ -585,6 +625,7 @@ private T readXContent(BytesReference bytes, CheckedFunction keyFunction, CheckedConsumer bytesReferenceConsumer ) throws IOException { @@ -630,13 +671,7 @@ private static void consumeFromType( // startup, on the main thread and before most other services have started, and we will need space to serialize the // whole cluster state in memory later on. - final String key; - if (type.equals(GLOBAL_TYPE_NAME)) { - key = GLOBAL_TYPE_NAME; - } else { - key = document.getField(INDEX_UUID_FIELD_NAME).stringValue(); - } - + final var key = keyFunction.apply(document); final PaginatedDocumentReader reader = documentReaders.computeIfAbsent(key, k -> new PaginatedDocumentReader()); final BytesReference bytesReference = reader.addPage(key, documentData, pageIndex, isLastPage); if (bytesReference != null) { @@ -670,6 +705,7 @@ private static BytesReference uncompress(BytesReference bytesReference) throws I Map params = Maps.newMapWithExpectedSize(2); params.put("binary", "true"); params.put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY); + params.put(Metadata.DEDUPLICATED_MAPPINGS_PARAM, Boolean.TRUE.toString()); FORMAT_PARAMS = new ToXContent.MapParams(params); } @@ -728,6 +764,11 @@ void deleteIndexMetadata(String indexUUID) throws IOException { indexWriter.deleteDocuments(new Term(INDEX_UUID_FIELD_NAME, indexUUID)); } + public void deleteMappingMetadata(String mappingHash) throws IOException { + this.logger.trace("removing mapping metadata for [{}]", mappingHash); + indexWriter.deleteDocuments(new Term(MAPPING_HASH_FIELD_NAME, mappingHash)); + } + void flush() throws IOException { this.logger.trace("flushing"); this.indexWriter.flush(); @@ -906,6 +947,27 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata addGlobalMetadataDocuments(metadata); } + int numMappingsAdded = 0; + int numMappingsRemoved = 0; + int numMappingsUnchanged = 0; + final var previousMappingHashes = new HashSet<>(previouslyWrittenMetadata.getMappingsByHash().keySet()); + for (final var entry : metadata.getMappingsByHash().entrySet()) { + if (previousMappingHashes.remove(entry.getKey()) == false) { + addMappingDocuments(entry.getKey(), entry.getValue()); + numMappingsAdded++; + } else { + logger.trace("no action required for mapping [{}]", entry.getKey()); + numMappingsUnchanged++; + } + } + + for (final var unusedMappingHash : previousMappingHashes) { + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.deleteMappingMetadata(unusedMappingHash); + numMappingsRemoved++; + } + } + final Map indexMetadataVersionByUUID = Maps.newMapWithExpectedSize(previouslyWrittenMetadata.indices().size()); previouslyWrittenMetadata.indices().forEach((name, indexMetadata) -> { final Long previousValue = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion()); @@ -938,7 +1000,7 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata addIndexMetadataDocuments(indexMetadata); } else { numIndicesUnchanged++; - logger.trace("no action required for [{}]", indexMetadata.getIndex()); + logger.trace("no action required for index [{}]", indexMetadata.getIndex()); } indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID()); } @@ -956,13 +1018,41 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata metadataIndexWriter.flush(); } - return new WriterStats(false, updateGlobalMeta, numIndicesUnchanged, numIndicesAdded, numIndicesUpdated, numIndicesRemoved); + return new WriterStats( + false, + updateGlobalMeta, + numMappingsUnchanged, + numMappingsAdded, + numMappingsRemoved, + numIndicesUnchanged, + numIndicesAdded, + numIndicesUpdated, + numIndicesRemoved + ); } private static int lastPageValue(boolean isLastPage) { return isLastPage ? IS_LAST_PAGE : IS_NOT_LAST_PAGE; } + private void addMappingDocuments(String key, MappingMetadata mappingMetadata) throws IOException { + logger.trace("writing mapping metadata with hash [{}]", key); + writePages( + (builder, params) -> builder.field("content", mappingMetadata.source().compressed()), + (((bytesRef, pageIndex, isLastPage) -> { + final Document document = new Document(); + document.add(new StringField(TYPE_FIELD_NAME, MAPPING_TYPE_NAME, Field.Store.NO)); + document.add(new StringField(MAPPING_HASH_FIELD_NAME, key, Field.Store.YES)); + document.add(new StoredField(PAGE_FIELD_NAME, pageIndex)); + document.add(new StoredField(LAST_PAGE_FIELD_NAME, lastPageValue(isLastPage))); + document.add(new StoredField(DATA_FIELD_NAME, bytesRef)); + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.indexWriter.addDocument(document); + } + })) + ); + } + private void addIndexMetadataDocuments(IndexMetadata indexMetadata) throws IOException { final String indexUUID = indexMetadata.getIndexUUID(); assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; @@ -994,7 +1084,7 @@ private void addGlobalMetadataDocuments(Metadata metadata) throws IOException { }); } - private void writePages(ToXContent metadata, PageWriter pageWriter) throws IOException { + private void writePages(ToXContentFragment metadata, PageWriter pageWriter) throws IOException { try ( PageWriterOutputStream paginatedStream = new PageWriterOutputStream(documentBuffer, pageWriter); OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(paginatedStream); @@ -1022,6 +1112,10 @@ private WriterStats overwriteMetadata(Metadata metadata) throws IOException { private WriterStats addMetadata(Metadata metadata) throws IOException { addGlobalMetadataDocuments(metadata); + for (final var entry : metadata.getMappingsByHash().entrySet()) { + addMappingDocuments(entry.getKey(), entry.getValue()); + } + for (IndexMetadata indexMetadata : metadata.indices().values()) { addIndexMetadataDocuments(indexMetadata); } @@ -1032,7 +1126,7 @@ private WriterStats addMetadata(Metadata metadata) throws IOException { metadataIndexWriter.flush(); } - return new WriterStats(true, true, 0, 0, metadata.indices().size(), 0); + return new WriterStats(true, true, 0, metadata.getMappingsByHash().size(), 0, 0, metadata.indices().size(), 0, 0); } public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion, Version oldestIndexVersion) @@ -1130,6 +1224,9 @@ public void close() throws IOException { private record WriterStats( boolean isFullWrite, boolean globalMetaUpdated, + int numMappingsUnchanged, + int numMappingsAdded, + int numMappingsRemoved, int numIndicesUnchanged, int numIndicesAdded, int numIndicesUpdated, @@ -1138,14 +1235,24 @@ private record WriterStats( @Override public String toString() { if (isFullWrite) { - return String.format(Locale.ROOT, "wrote global metadata and metadata for [%d] indices", numIndicesUpdated); + return String.format( + Locale.ROOT, + "wrote global metadata, [%d] mappings, and metadata for [%d] indices", + numMappingsAdded, + numIndicesAdded + ); } else { return String.format( Locale.ROOT, """ - [%s] global metadata, wrote metadata for [%d] new indices and [%d] existing indices, \ + [%s] global metadata, \ + wrote [%d] new mappings, removed [%d] mappings and skipped [%d] unchanged mappings, \ + wrote metadata for [%d] new indices and [%d] existing indices, \ removed metadata for [%d] indices and skipped [%d] unchanged indices""", globalMetaUpdated ? "wrote" : "skipped writing", + numMappingsAdded, + numMappingsRemoved, + numMappingsUnchanged, numIndicesAdded, numIndicesUpdated, numIndicesRemoved, diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 21ccaff0f646f..9b45fb0e2a25d 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.coordination.CoordinationMetadata; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -57,6 +58,7 @@ import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; @@ -74,6 +76,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.IntPredicate; @@ -87,6 +90,7 @@ import static org.elasticsearch.gateway.PersistedClusterStateService.IS_LAST_PAGE; import static org.elasticsearch.gateway.PersistedClusterStateService.IS_NOT_LAST_PAGE; import static org.elasticsearch.gateway.PersistedClusterStateService.LAST_PAGE_FIELD_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.MAPPING_TYPE_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.METADATA_DIRECTORY_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.PAGE_FIELD_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.TYPE_FIELD_NAME; @@ -704,6 +708,7 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { .put( IndexMetadata.builder(indexName) .version(1L) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -726,7 +731,8 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { Directory dupDirectory = newFSDirectory(dupPath.resolve(METADATA_DIRECTORY_NAME)) ) { try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { - indexWriter.deleteDocuments(new Term("type", "global")); // do not duplicate global metadata + indexWriter.deleteDocuments(new Term(TYPE_FIELD_NAME, GLOBAL_TYPE_NAME)); // do not duplicate global metadata + indexWriter.deleteDocuments(new Term(TYPE_FIELD_NAME, MAPPING_TYPE_NAME)); // do not duplicate mappings indexWriter.addIndexes(dupDirectory); indexWriter.commit(); } @@ -773,6 +779,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws .put( IndexMetadata.builder("test") .version(indexMetadataVersion - 1) // -1 because it's incremented in .put() + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -801,6 +808,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) + .putMapping(indexMetadata.mapping()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -828,6 +836,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -858,6 +867,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws ) .put( IndexMetadata.builder(indexMetadata) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -902,6 +912,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .coordinationMetadata(CoordinationMetadata.builder(clusterState.coordinationMetadata()).term(term).build()) .put( IndexMetadata.builder("updated") + .putMapping(randomMappingMetadata(true)) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -913,6 +924,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc ) .put( IndexMetadata.builder("deleted") + .putMapping(randomMappingMetadata(true)) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -950,6 +962,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .remove("deleted") .put( IndexMetadata.builder("updated") + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(clusterState.metadata().index("updated").getSettings()) @@ -959,6 +972,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .put( IndexMetadata.builder("added") .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) @@ -1008,6 +1022,7 @@ public void testReloadsMetadataAcrossMultipleSegments() throws IOException { .version(i + 2) .put( IndexMetadata.builder(index.getName()) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1041,6 +1056,7 @@ public void testHandlesShuffledDocuments() throws IOException { for (int i = between(5, 20); i >= 0; i--) { metadata.put( IndexMetadata.builder("test-" + i) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1073,19 +1089,21 @@ public void testHandlesShuffledDocuments() throws IOException { commitUserData = reader.getIndexCommit().getUserData(); final IndexSearcher indexSearcher = new IndexSearcher(reader); indexSearcher.setQueryCache(null); - for (String typeName : new String[] { GLOBAL_TYPE_NAME, INDEX_TYPE_NAME }) { + for (String typeName : new String[] { GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME }) { final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { final Scorer scorer = weight.scorer(leafReaderContext); - final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); - final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (isLiveDoc.test(docIdSetIterator.docID())) { - final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); - document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - documents.add(document); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + documents.add(document); + } } } } @@ -1119,7 +1137,11 @@ public void testHandlesShuffledDocuments() throws IOException { final boolean isOnlyPageForIndex = corruptDocument.getField(TYPE_FIELD_NAME).stringValue().equals(INDEX_TYPE_NAME) && corruptDocPage == 0 && corruptDocIsLastPage; + final boolean isOnlyPageForMapping = corruptDocument.getField(TYPE_FIELD_NAME).stringValue().equals(MAPPING_TYPE_NAME) + && corruptDocPage == 0 + && corruptDocIsLastPage; if (isOnlyPageForIndex == false // don't remove the only doc for an index, this just loses the index and doesn't corrupt + && isOnlyPageForMapping == false // asdflkjasdflkjasdlkj && rarely()) { documents.remove(corruptIndex); } else { @@ -1194,7 +1216,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1210,7 +1232,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1244,7 +1266,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1254,6 +1276,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { .version(clusterState.version()) .put( IndexMetadata.builder("test") + .putMapping(randomMappingMetadata(false)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1277,12 +1300,27 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing cluster state took [*] which is above the warn threshold of [*]; [skipped writing] global metadata, \ + wrote [1] new mappings, removed [0] mappings and skipped [0] unchanged mappings, \ wrote metadata for [1] new indices and [0] existing indices, removed metadata for [0] indices and \ skipped [0] unchanged indices""" ) ); + // force a full write, so that the next write is an actual incremental write from clusterState->newClusterState writeDurationMillis.set(randomLongBetween(0, writeDurationMillis.get() - 1)); + assertExpectedLogs( + 1L, + null, + clusterState, + writer, + new MockLogAppender.UnseenEventExpectation( + "should not see warning below threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "*" + ) + ); + assertExpectedLogs( 1L, clusterState, @@ -1296,7 +1334,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { ) ); - assertThat(currentTime.get(), lessThan(startTimeMillis + 14 * slowWriteLoggingThresholdMillis)); // ensure no overflow + assertThat(currentTime.get(), lessThan(startTimeMillis + 16 * slowWriteLoggingThresholdMillis)); // ensure no overflow } } } @@ -1353,6 +1391,7 @@ public void testLimitsFileCount() throws IOException { .version(i + 2) .put( IndexMetadata.builder("index-" + i) + .putMapping(randomMappingMetadata(true)) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1484,6 +1523,7 @@ public void testOldestIndexVersionIsCorrectlySerialized() throws IOException { for (Version indexVersion : indexVersions) { String indexUUID = UUIDs.randomBase64UUID(random()); IndexMetadata im = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("index", lastIndexNum)) + .putMapping(randomMappingMetadata(true)) .settings(settings(indexVersion).put(IndexMetadata.SETTING_INDEX_UUID, indexUUID)) .numberOfShards(1) .numberOfReplicas(1) @@ -1569,6 +1609,129 @@ public void testDebugLogging() throws IOException, IllegalAccessException { } } + public void testDeduplicatedMappings() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + try (Writer writer = persistedClusterStateService.createWriter()) { + + Set hashes; + Metadata.Builder metadata; + ClusterState clusterState; + ClusterState previousState; + + // generate two mappings + MappingMetadata mapping1 = randomMappingMetadata(false); + MappingMetadata mapping2 = randomValueOtherThan(mapping1, () -> randomMappingMetadata(false)); + + // build and write a cluster state with metadata that has all indices using a single mapping + metadata = Metadata.builder(); + for (int i = between(5, 20); i >= 0; i--) { + metadata.put( + IndexMetadata.builder("test-" + i) + .putMapping(mapping1) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ); + } + clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(1)); + writer.writeFullStateAndCommit(0L, clusterState); + + // verify that the on-disk state reflects 1 mapping + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(1)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + + previousState = clusterState; + metadata = Metadata.builder(previousState.metadata()); + + // add a second mapping -- either by adding a new index or changing an existing one + if (randomBoolean()) { + // add another index with a different mapping + metadata.put( + IndexMetadata.builder("test-" + 99) + .putMapping(mapping2) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ); + } else { + // change an existing index to a different mapping + String index = randomFrom(previousState.metadata().getIndices().keySet()); + metadata.put(IndexMetadata.builder(metadata.get(index)).putMapping(mapping2)); + } + clusterState = ClusterState.builder(previousState).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(2)); + writer.writeIncrementalStateAndCommit(0L, previousState, clusterState); + + // verify that the on-disk state reflects 2 mappings + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(2)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + + previousState = clusterState; + metadata = Metadata.builder(previousState.metadata()); + + // update all indices to use the second mapping + for (String index : previousState.metadata().getIndices().keySet()) { + metadata.put(IndexMetadata.builder(metadata.get(index)).putMapping(mapping2)); + } + clusterState = ClusterState.builder(previousState).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(1)); + writer.writeIncrementalStateAndCommit(0L, previousState, clusterState); + + // verify that the on-disk reflects 1 mapping + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(1)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + } + } + } + + /** + * Search the underlying persisted state indices for non-deleted mapping_hash documents that represent the + * first page of data, collecting and returning the distinct mapping_hashes themselves. + */ + private static Set loadPersistedMappingHashes(Path metadataDirectory) throws IOException { + Set hashes = new HashSet<>(); + try (Directory directory = new NIOFSDirectory(metadataDirectory); DirectoryReader reader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, MAPPING_TYPE_NAME)); + final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { + final Scorer scorer = weight.scorer(leafReaderContext); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + int page = document.getField("page").numericValue().intValue(); + if (page == 0) { + String hash = document.getField("mapping_hash").stringValue(); + assertTrue(hashes.add(hash)); + } + } + } + } + } + } + return hashes; + } + private boolean findSegmentInDirectory(Path dataPath) throws IOException { Directory d = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); @@ -1629,6 +1792,18 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException ); } + private static MappingMetadata randomMappingMetadata(boolean allowNull) { + int i = randomIntBetween(0, 4); + if (i == 0 && allowNull) { + return null; + } else { + return new MappingMetadata( + MapperService.SINGLE_MAPPING_NAME, + Map.of("_doc", Map.of("properties", Map.of("field" + i, "text"))) + ); + } + } + private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException { final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false); return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);