diff --git a/server/src/internalClusterTest/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index c32c9abfa4fc9..3d786a9ccd337 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -30,10 +30,9 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetadata; @@ -44,6 +43,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; +import org.elasticsearch.xcontent.XContentFactory; import java.io.IOException; import java.nio.file.Path; @@ -516,11 +516,11 @@ public void testHalfDeletedIndexImport() throws Exception { final Path[] paths = internalCluster().getInstance(NodeEnvironment.class).nodeDataPaths(); final String nodeId = client().admin().cluster().prepareNodesInfo(nodeName).clear().get().getNodes().get(0).getNode().getId(); - writeBrokenMeta(metaStateService -> { + writeBrokenMeta(nodeEnvironment -> { for (final Path path : paths) { IOUtils.rm(path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)); } - metaStateService.writeGlobalState("test", Metadata.builder(metadata) + MetaStateWriterUtils.writeGlobalState(nodeEnvironment, "test", Metadata.builder(metadata) // we remove the manifest file, resetting the term and making this look like an upgrade from 6.x, so must also reset the // term in the coordination metadata .coordinationMetadata(CoordinationMetadata.builder(metadata.coordinationMetadata()).term(0L).build()) @@ -534,14 +534,14 @@ public void testHalfDeletedIndexImport() throws Exception { assertBusy(() -> assertThat(internalCluster().getInstance(NodeEnvironment.class).availableIndexFolders(), empty())); } - private void writeBrokenMeta(CheckedConsumer writer) throws Exception { - Map metaStateServices = Stream.of(internalCluster().getNodeNames()) - .collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(MetaStateService.class, nodeName))); + private void writeBrokenMeta(CheckedConsumer writer) throws Exception { + Map nodeEnvironments = Stream.of(internalCluster().getNodeNames()) + .collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(NodeEnvironment.class, nodeName))); internalCluster().fullRestart(new RestartCallback(){ @Override public Settings onNodeStopped(String nodeName) throws Exception { - final MetaStateService metaStateService = metaStateServices.get(nodeName); - writer.accept(metaStateService); + final NodeEnvironment nodeEnvironment = nodeEnvironments.get(nodeName); + writer.accept(nodeEnvironment); return super.onNodeStopped(nodeName); } }); diff --git a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java deleted file mode 100644 index bf25c8931fbff..0000000000000 --- a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ -package org.elasticsearch.gateway; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Manifest; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.Index; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.LongSupplier; - -/** - * Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata). - */ -public class IncrementalClusterStateWriter { - - private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class); - - private final MetaStateService metaStateService; - - // We call updateClusterState on the (unique) cluster applier thread so there's no need to synchronize access to these fields. - private Manifest previousManifest; - private ClusterState previousClusterState; - private final LongSupplier relativeTimeMillisSupplier; - private boolean incrementalWrite; - - private volatile TimeValue slowWriteLoggingThreshold; - - IncrementalClusterStateWriter(Settings settings, ClusterSettings clusterSettings, MetaStateService metaStateService, Manifest manifest, - ClusterState clusterState, LongSupplier relativeTimeMillisSupplier) { - this.metaStateService = metaStateService; - this.previousManifest = manifest; - this.previousClusterState = clusterState; - this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; - this.incrementalWrite = false; - this.slowWriteLoggingThreshold = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, - this::setSlowWriteLoggingThreshold); - } - - private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { - this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; - } - - void setCurrentTerm(long currentTerm) throws WriteStateException { - Manifest manifest = new Manifest(currentTerm, previousManifest.getClusterStateVersion(), previousManifest.getGlobalGeneration(), - new HashMap<>(previousManifest.getIndexGenerations())); - metaStateService.writeManifestAndCleanup("current term changed", manifest); - previousManifest = manifest; - } - - Manifest getPreviousManifest() { - return previousManifest; - } - - void setIncrementalWrite(boolean incrementalWrite) { - this.incrementalWrite = incrementalWrite; - } - - /** - * Updates manifest and meta data on disk. - * - * @param newState new {@link ClusterState} - * - * @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}. - */ - void updateClusterState(ClusterState newState) throws WriteStateException { - Metadata newMetadata = newState.metadata(); - - final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); - - final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest); - long globalStateGeneration = writeGlobalState(writer, newMetadata); - Map indexGenerations = writeIndicesMetadata(writer, newState); - Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations); - writeManifest(writer, manifest); - previousManifest = manifest; - previousClusterState = newState; - - final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; - final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold; - if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) { - logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + - "wrote metadata for [{}] indices and skipped [{}] unchanged indices", - durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped()); - } else { - logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices", - durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped()); - } - } - - private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException { - if (manifest.equals(previousManifest) == false) { - writer.writeManifestAndCleanup("changed", manifest); - } - } - - private Map writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState) - throws WriteStateException { - Map previouslyWrittenIndices = previousManifest.getIndexGenerations(); - Set relevantIndices = getRelevantIndices(newState); - - Map newIndices = new HashMap<>(); - - Metadata previousMetadata = incrementalWrite ? previousClusterState.metadata() : null; - Iterable actions = resolveIndexMetadataActions(previouslyWrittenIndices, relevantIndices, previousMetadata, - newState.metadata()); - - for (IndexMetadataAction action : actions) { - long generation = action.execute(writer); - newIndices.put(action.getIndex(), generation); - } - - return newIndices; - } - - private long writeGlobalState(AtomicClusterStateWriter writer, Metadata newMetadata) throws WriteStateException { - if (incrementalWrite == false || Metadata.isGlobalStateEquals(previousClusterState.metadata(), newMetadata) == false) { - return writer.writeGlobalState("changed", newMetadata); - } - return previousManifest.getGlobalGeneration(); - } - - - /** - * Returns list of {@link IndexMetadataAction} for each relevant index. - * For each relevant index there are 3 options: - *
    - *
  1. - * {@link KeepPreviousGeneration} - index metadata is already stored to disk and index metadata version is not changed, no - * action is required. - *
  2. - *
  3. - * {@link WriteNewIndexMetadata} - there is no index metadata on disk and index metadata for this index should be written. - *
  4. - *
  5. - * {@link WriteChangedIndexMetadata} - index metadata is already on disk, but index metadata version has changed. Updated - * index metadata should be written to disk. - *
  6. - *
- * - * @param previouslyWrittenIndices A list of indices for which the state was already written before - * @param relevantIndices The list of indices for which state should potentially be written - * @param previousMetadata The last meta data we know of - * @param newMetadata The new metadata - * @return list of {@link IndexMetadataAction} for each relevant index. - */ - // exposed for tests - static List resolveIndexMetadataActions(Map previouslyWrittenIndices, - Set relevantIndices, - Metadata previousMetadata, - Metadata newMetadata) { - List actions = new ArrayList<>(); - for (Index index : relevantIndices) { - IndexMetadata newIndexMetadata = newMetadata.getIndexSafe(index); - IndexMetadata previousIndexMetadata = previousMetadata == null ? null : previousMetadata.index(index); - - if (previouslyWrittenIndices.containsKey(index) == false || previousIndexMetadata == null) { - actions.add(new WriteNewIndexMetadata(newIndexMetadata)); - } else if (previousIndexMetadata.getVersion() != newIndexMetadata.getVersion()) { - actions.add(new WriteChangedIndexMetadata(previousIndexMetadata, newIndexMetadata)); - } else { - actions.add(new KeepPreviousGeneration(index, previouslyWrittenIndices.get(index))); - } - } - return actions; - } - - // exposed for tests - static Set getRelevantIndices(ClusterState state) { - assert state.nodes().getLocalNode().canContainData(); - final RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); - if (newRoutingNode == null) { - throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); - } - final Set indices = new HashSet<>(); - for (final ShardRouting routing : newRoutingNode) { - indices.add(routing.index()); - } - return indices; - } - - /** - * Action to perform with index metadata. - */ - interface IndexMetadataAction { - /** - * @return index for index metadata. - */ - Index getIndex(); - - /** - * Executes this action using provided {@link AtomicClusterStateWriter}. - * - * @return new index metadata state generation, to be used in manifest file. - * @throws WriteStateException if exception occurs. - */ - long execute(AtomicClusterStateWriter writer) throws WriteStateException; - } - - /** - * This class is used to write changed global {@link Metadata}, {@link IndexMetadata} and {@link Manifest} to disk. - * This class delegates write* calls to corresponding write calls in {@link MetaStateService} and - * additionally it keeps track of cleanup actions to be performed if transaction succeeds or fails. - */ - static class AtomicClusterStateWriter { - private static final String FINISHED_MSG = "AtomicClusterStateWriter is finished"; - private final List commitCleanupActions; - private final List rollbackCleanupActions; - private final Manifest previousManifest; - private final MetaStateService metaStateService; - private boolean finished; - - private int indicesWritten; - private int indicesSkipped; - - AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) { - this.metaStateService = metaStateService; - assert previousManifest != null; - this.previousManifest = previousManifest; - this.commitCleanupActions = new ArrayList<>(); - this.rollbackCleanupActions = new ArrayList<>(); - this.finished = false; - } - - long writeGlobalState(String reason, Metadata metadata) throws WriteStateException { - assert finished == false : FINISHED_MSG; - try { - rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration())); - long generation = metaStateService.writeGlobalState(reason, metadata); - commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation)); - return generation; - } catch (WriteStateException e) { - rollback(); - throw e; - } - } - - long writeIndex(String reason, IndexMetadata metadata) throws WriteStateException { - assert finished == false : FINISHED_MSG; - try { - Index index = metadata.getIndex(); - Long previousGeneration = previousManifest.getIndexGenerations().get(index); - if (previousGeneration != null) { - // we prefer not to clean-up index metadata in case of rollback, - // if it's not referenced by previous manifest file - // not to break dangling indices functionality - rollbackCleanupActions.add(() -> metaStateService.cleanupIndex(index, previousGeneration)); - } - long generation = metaStateService.writeIndex(reason, metadata); - commitCleanupActions.add(() -> metaStateService.cleanupIndex(index, generation)); - return generation; - } catch (WriteStateException e) { - rollback(); - throw e; - } - } - - void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException { - assert finished == false : FINISHED_MSG; - try { - metaStateService.writeManifestAndCleanup(reason, manifest); - commitCleanupActions.forEach(Runnable::run); - finished = true; - } catch (WriteStateException e) { - // If the Manifest write results in a dirty WriteStateException it's not safe to roll back, removing the new metadata files, - // because if the Manifest was actually written to disk and its deletion fails it will reference these new metadata files. - // On master-eligible nodes a dirty WriteStateException here is fatal to the node since we no longer really have any idea - // what the state on disk is and the only sensible response is to start again from scratch. - if (e.isDirty() == false) { - rollback(); - } - throw e; - } - } - - void rollback() { - rollbackCleanupActions.forEach(Runnable::run); - finished = true; - } - - void incrementIndicesWritten() { - indicesWritten++; - } - - void incrementIndicesSkipped() { - indicesSkipped++; - } - - int getIndicesWritten() { - return indicesWritten; - } - - int getIndicesSkipped() { - return indicesSkipped; - } - } - - static class KeepPreviousGeneration implements IndexMetadataAction { - private final Index index; - private final long generation; - - KeepPreviousGeneration(Index index, long generation) { - this.index = index; - this.generation = generation; - } - - @Override - public Index getIndex() { - return index; - } - - @Override - public long execute(AtomicClusterStateWriter writer) { - writer.incrementIndicesSkipped(); - return generation; - } - } - - static class WriteNewIndexMetadata implements IndexMetadataAction { - private final IndexMetadata indexMetadata; - - WriteNewIndexMetadata(IndexMetadata indexMetadata) { - this.indexMetadata = indexMetadata; - } - - @Override - public Index getIndex() { - return indexMetadata.getIndex(); - } - - @Override - public long execute(AtomicClusterStateWriter writer) throws WriteStateException { - writer.incrementIndicesWritten(); - return writer.writeIndex("freshly created", indexMetadata); - } - } - - static class WriteChangedIndexMetadata implements IndexMetadataAction { - private final IndexMetadata newIndexMetadata; - private final IndexMetadata oldIndexMetadata; - - WriteChangedIndexMetadata(IndexMetadata oldIndexMetadata, IndexMetadata newIndexMetadata) { - this.oldIndexMetadata = oldIndexMetadata; - this.newIndexMetadata = newIndexMetadata; - } - - @Override - public Index getIndex() { - return newIndexMetadata.getIndex(); - } - - @Override - public long execute(AtomicClusterStateWriter writer) throws WriteStateException { - writer.incrementIndicesWritten(); - return writer.writeIndex( - "version changed from [" + oldIndexMetadata.getVersion() + "] to [" + newIndexMetadata.getVersion() + "]", - newIndexMetadata); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java b/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java index 5c4c7992de34b..741d14f662fcb 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Manifest; @@ -28,18 +29,14 @@ import java.util.function.Predicate; /** - * Handles writing and loading {@link Manifest}, {@link Metadata} and {@link IndexMetadata} + * Handles writing and loading {@link Manifest}, {@link Metadata} and {@link IndexMetadata} as used for cluster state persistence in + * versions prior to {@link Version#V_7_6_0}, used to read this older format during an upgrade from these versions. */ public class MetaStateService { private static final Logger logger = LogManager.getLogger(MetaStateService.class); - private final NodeEnvironment nodeEnv; - private final NamedXContentRegistry namedXContentRegistry; - - // we allow subclasses in tests to redefine formats, e.g. to inject failures - protected MetadataStateFormat METADATA_FORMAT = Metadata.FORMAT; - protected MetadataStateFormat INDEX_METADATA_FORMAT = IndexMetadata.FORMAT; - protected MetadataStateFormat MANIFEST_FORMAT = Manifest.FORMAT; + public final NodeEnvironment nodeEnv; + public final NamedXContentRegistry namedXContentRegistry; public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) { this.nodeEnv = nodeEnv; @@ -58,7 +55,7 @@ public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXCon * @throws IOException if some IOException when loading files occurs or there is no metadata referenced by manifest file. */ public Tuple loadFullState() throws IOException { - final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); + final Manifest manifest = Manifest.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); if (manifest == null) { return loadFullStateBWC(); } @@ -67,7 +64,7 @@ public Tuple loadFullState() throws IOException { if (manifest.isGlobalGenerationMissing()) { metadataBuilder = Metadata.builder(); } else { - final Metadata globalMetadata = METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(), + final Metadata globalMetadata = Metadata.FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(), nodeEnv.nodeDataPaths()); if (globalMetadata != null) { metadataBuilder = Metadata.builder(globalMetadata); @@ -80,7 +77,7 @@ public Tuple loadFullState() throws IOException { final Index index = entry.getKey(); final long generation = entry.getValue(); final String indexFolderName = index.getUUID(); - final IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation, + final IndexMetadata indexMetadata = IndexMetadata.FORMAT.loadGeneration(logger, namedXContentRegistry, generation, nodeEnv.resolveIndexFolder(indexFolderName)); if (indexMetadata != null) { metadataBuilder.put(indexMetadata, false); @@ -101,7 +98,7 @@ private Tuple loadFullStateBWC() throws IOException { Metadata.Builder metadataBuilder; Tuple metadataAndGeneration = - METADATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); + Metadata.FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); Metadata globalMetadata = metadataAndGeneration.v1(); long globalStateGeneration = metadataAndGeneration.v2(); @@ -116,7 +113,7 @@ private Tuple loadFullStateBWC() throws IOException { for (String indexFolderName : nodeEnv.availableIndexFolders()) { Tuple indexMetadataAndGeneration = - INDEX_METADATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, + IndexMetadata.FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.resolveIndexFolder(indexFolderName)); IndexMetadata indexMetadata = indexMetadataAndGeneration.v1(); long generation = indexMetadataAndGeneration.v2(); @@ -142,7 +139,7 @@ private Tuple loadFullStateBWC() throws IOException { */ @Nullable public IndexMetadata loadIndexState(Index index) throws IOException { - return INDEX_METADATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.indexPaths(index)); + return IndexMetadata.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.indexPaths(index)); } /** @@ -153,7 +150,7 @@ List loadIndicesStates(Predicate excludeIndexPathIdsPredi for (String indexFolderName : nodeEnv.availableIndexFolders(excludeIndexPathIdsPredicate)) { assert excludeIndexPathIdsPredicate.test(indexFolderName) == false : "unexpected folder " + indexFolderName + " which should have been excluded"; - IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.loadLatestState(logger, namedXContentRegistry, + IndexMetadata indexMetadata = IndexMetadata.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.resolveIndexFolder(indexFolderName)); if (indexMetadata != null) { final String indexPathId = indexMetadata.getIndex().getUUID(); @@ -173,80 +170,7 @@ List loadIndicesStates(Predicate excludeIndexPathIdsPredi * Loads the global state, *without* index state, see {@link #loadFullState()} for that. */ Metadata loadGlobalState() throws IOException { - return METADATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); - } - - /** - * Writes manifest file (represented by {@link Manifest}) to disk and performs cleanup of old manifest state file if - * the write succeeds or newly created manifest state if the write fails. - * - * @throws WriteStateException if exception when writing state occurs. See also {@link WriteStateException#isDirty()} - */ - public void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException { - logger.trace("[_meta] writing state, reason [{}]", reason); - try { - long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths()); - logger.trace("[_meta] state written (generation: {})", generation); - } catch (WriteStateException ex) { - throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex); - } - } - - /** - * Writes the index state. - *

- * This method is public for testing purposes. - * - * @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return - * false, because new index state file is not yet referenced by manifest file. - */ - public long writeIndex(String reason, IndexMetadata indexMetadata) throws WriteStateException { - final Index index = indexMetadata.getIndex(); - logger.trace("[{}] writing state, reason [{}]", index, reason); - try { - long generation = INDEX_METADATA_FORMAT.write(indexMetadata, - nodeEnv.indexPaths(indexMetadata.getIndex())); - logger.trace("[{}] state written", index); - return generation; - } catch (WriteStateException ex) { - throw new WriteStateException(false, "[" + index + "]: failed to write index state", ex); - } - } - - /** - * Writes the global state, *without* the indices states. - * - * @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return - * false, because new global state file is not yet referenced by manifest file. - */ - long writeGlobalState(String reason, Metadata metadata) throws WriteStateException { - logger.trace("[_global] writing state, reason [{}]", reason); - try { - long generation = METADATA_FORMAT.write(metadata, nodeEnv.nodeDataPaths()); - logger.trace("[_global] state written"); - return generation; - } catch (WriteStateException ex) { - throw new WriteStateException(false, "[_global]: failed to write global state", ex); - } - } - - /** - * Removes old state files in global state directory. - * - * @param currentGeneration current state generation to keep in the directory. - */ - void cleanupGlobalState(long currentGeneration) { - METADATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.nodeDataPaths()); - } - - /** - * Removes old state files in index directory. - * - * @param index index to perform clean up on. - * @param currentGeneration current state generation to keep in the index directory. - */ - public void cleanupIndex(Index index, long currentGeneration) { - INDEX_METADATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.indexPaths(index)); + return Metadata.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); } /** @@ -254,8 +178,8 @@ public void cleanupIndex(Index index, long currentGeneration) { * (only used for dangling indices at that point). */ public void unreferenceAll() throws IOException { - MANIFEST_FORMAT.writeAndCleanup(Manifest.empty(), nodeEnv.nodeDataPaths()); // write empty file so that indices become unreferenced - METADATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths()); + Manifest.FORMAT.writeAndCleanup(Manifest.empty(), nodeEnv.nodeDataPaths()); // write empty file so that indices become unreferenced + Metadata.FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths()); } /** @@ -270,6 +194,7 @@ public void deleteAll() throws IOException { // delete meta state directories of indices MetadataStateFormat.deleteMetaState(nodeEnv.resolveIndexFolder(indexFolderName)); } - MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths()); // finally delete manifest + Manifest.FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths()); // finally delete manifest } } + diff --git a/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java b/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java index ec9553c415df3..a77f965ac77fb 100644 --- a/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java @@ -51,7 +51,7 @@ public void testDanglingIndicesAreDiscovered() throws Exception { final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID"); IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build(); - metaStateService.writeIndex("test_write", dangledIndex); + MetaStateWriterUtils.writeIndex(env, "test_write", dangledIndex); Map newDanglingIndices = danglingState.getDanglingIndices(); assertTrue(newDanglingIndices.containsKey(dangledIndex.getIndex())); @@ -68,7 +68,7 @@ public void testInvalidIndexFolder() throws Exception { final String uuid = "test1UUID"; final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetadata.SETTING_INDEX_UUID, uuid); IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build(); - metaStateService.writeIndex("test_write", dangledIndex); + MetaStateWriterUtils.writeIndex(env, "test_write", dangledIndex); for (Path path : env.resolveIndexFolder(uuid)) { if (Files.exists(path)) { Files.move(path, path.resolveSibling("invalidUUID"), StandardCopyOption.ATOMIC_MOVE); @@ -86,7 +86,7 @@ public void testDanglingIndicesNotReportedWhenTombstonePresent() throws Exceptio final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID"); IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build(); - metaStateService.writeIndex("test_write", dangledIndex); + MetaStateWriterUtils.writeIndex(env, "test_write", dangledIndex); final IndexGraveyard graveyard = IndexGraveyard.builder().addTombstone(dangledIndex.getIndex()).build(); final Metadata metadata = Metadata.builder().indexGraveyard(graveyard).build(); @@ -106,14 +106,14 @@ public void testDanglingIndicesReportedWhenIndexNameIsAlreadyUsed() throws Excep .put(indexSettings) .put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID"); IndexMetadata dangledIndex = IndexMetadata.builder("test_index").settings(danglingSettings).build(); - metaStateService.writeIndex("test_write", dangledIndex); + MetaStateWriterUtils.writeIndex(env, "test_write", dangledIndex); // Build another index with the same name but a different UUID final Settings.Builder existingSettings = Settings.builder() .put(indexSettings) .put(IndexMetadata.SETTING_INDEX_UUID, "test2UUID"); IndexMetadata existingIndex = IndexMetadata.builder("test_index").settings(existingSettings).build(); - metaStateService.writeIndex("test_write", existingIndex); + MetaStateWriterUtils.writeIndex(env, "test_write", existingIndex); final ImmutableOpenMap indices = ImmutableOpenMap.builder() .fPut(dangledIndex.getIndex().getName(), existingIndex) @@ -137,7 +137,7 @@ public void testDanglingIndicesStripAliases() throws Exception { .settings(settings) .putAlias(AliasMetadata.newAliasMetadataBuilder("test_aliasd").build()) .build(); - metaStateService.writeIndex("test_write", dangledIndex); + MetaStateWriterUtils.writeIndex(env, "test_write", dangledIndex); assertThat(dangledIndex.getAliases().size(), equalTo(1)); final Metadata metadata = Metadata.builder().build(); diff --git a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java deleted file mode 100644 index e2456a0b1540e..0000000000000 --- a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java +++ /dev/null @@ -1,509 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ -package org.elasticsearch.gateway; - -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.MockDirectoryWrapper; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Manifest; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.MetadataIndexStateService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.Index; -import org.elasticsearch.test.MockLogAppender; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.mockito.ArgumentCaptor; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThan; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { - - private ClusterState clusterStateWithUnassignedIndex(IndexMetadata indexMetadata, boolean masterEligible) { - Metadata metadata = Metadata.builder() - .put(indexMetadata, false) - .build(); - - RoutingTable routingTable = RoutingTable.builder() - .addAsNew(metadata.index("test")) - .build(); - - return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - } - - private ClusterState clusterStateWithAssignedIndex(IndexMetadata indexMetadata, boolean masterEligible) { - AllocationService strategy = createAllocationService(Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 100) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) - .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) - .build()); - - ClusterState oldClusterState = clusterStateWithUnassignedIndex(indexMetadata, masterEligible); - RoutingTable routingTable = strategy.reroute(oldClusterState, "reroute").routingTable(); - - Metadata metadataNewClusterState = Metadata.builder() - .put(oldClusterState.metadata().index("test"), false) - .build(); - - return ClusterState.builder(oldClusterState).routingTable(routingTable) - .metadata(metadataNewClusterState).version(oldClusterState.getVersion() + 1).build(); - } - - private ClusterState clusterStateWithNonReplicatedClosedIndex(IndexMetadata indexMetadata, boolean masterEligible) { - ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetadata, masterEligible); - - Metadata metadataNewClusterState = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).state(IndexMetadata.State.CLOSE) - .numberOfShards(5).numberOfReplicas(2)) - .version(oldClusterState.metadata().version() + 1) - .build(); - RoutingTable routingTable = RoutingTable.builder() - .addAsRecovery(metadataNewClusterState.index("test")) - .build(); - - return ClusterState.builder(oldClusterState).routingTable(routingTable) - .metadata(metadataNewClusterState).version(oldClusterState.getVersion() + 1).build(); - } - - private ClusterState clusterStateWithReplicatedClosedIndex(IndexMetadata indexMetadata, boolean masterEligible, boolean assigned) { - ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetadata, masterEligible); - - Metadata metadataNewClusterState = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT) - .put(MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey(), true)) - .state(IndexMetadata.State.CLOSE) - .numberOfShards(5).numberOfReplicas(2)) - .version(oldClusterState.metadata().version() + 1) - .build(); - RoutingTable routingTable = RoutingTable.builder() - .addAsRecovery(metadataNewClusterState.index("test")) - .build(); - - oldClusterState = ClusterState.builder(oldClusterState).routingTable(routingTable) - .metadata(metadataNewClusterState).build(); - if (assigned) { - AllocationService strategy = createAllocationService(Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 100) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) - .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) - .build()); - - routingTable = strategy.reroute(oldClusterState, "reroute").routingTable(); - } - - return ClusterState.builder(oldClusterState).routingTable(routingTable) - .metadata(metadataNewClusterState).version(oldClusterState.getVersion() + 1).build(); - } - - private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) { - Set dataOnlyRoles = Collections.singleton(DiscoveryNodeRole.DATA_ROLE); - return DiscoveryNodes.builder().add(newNode("node1", masterEligible ? MASTER_DATA_ROLES : dataOnlyRoles)) - .add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node"); - } - - private IndexMetadata createIndexMetadata(String name) { - return IndexMetadata.builder(name). - settings(settings(Version.CURRENT)). - numberOfShards(5). - numberOfReplicas(2). - build(); - } - - public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() { - IndexMetadata indexMetadata = createIndexMetadata("test"); - Set indices = IncrementalClusterStateWriter.getRelevantIndices(clusterStateWithUnassignedIndex(indexMetadata, true)); - assertThat(indices.size(), equalTo(0)); - } - - public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() { - IndexMetadata indexMetadata = createIndexMetadata("test"); - Set indices = IncrementalClusterStateWriter.getRelevantIndices(clusterStateWithUnassignedIndex(indexMetadata, false)); - assertThat(indices.size(), equalTo(0)); - } - - public void testGetRelevantIndicesWithAssignedShards() { - IndexMetadata indexMetadata = createIndexMetadata("test"); - boolean masterEligible = randomBoolean(); - Set indices = IncrementalClusterStateWriter.getRelevantIndices(clusterStateWithAssignedIndex(indexMetadata, masterEligible)); - assertThat(indices.size(), equalTo(1)); - } - - public void testGetRelevantIndicesForNonReplicatedClosedIndexOnDataOnlyNode() { - IndexMetadata indexMetadata = createIndexMetadata("test"); - Set indices = IncrementalClusterStateWriter.getRelevantIndices( - clusterStateWithNonReplicatedClosedIndex(indexMetadata, false)); - assertThat(indices.size(), equalTo(0)); - } - - public void testGetRelevantIndicesForReplicatedClosedButUnassignedIndexOnDataOnlyNode() { - IndexMetadata indexMetadata = createIndexMetadata("test"); - Set indices = IncrementalClusterStateWriter.getRelevantIndices( - clusterStateWithReplicatedClosedIndex(indexMetadata, false, false)); - assertThat(indices.size(), equalTo(0)); - } - - public void testGetRelevantIndicesForReplicatedClosedAndAssignedIndexOnDataOnlyNode() { - IndexMetadata indexMetadata = createIndexMetadata("test"); - Set indices = IncrementalClusterStateWriter.getRelevantIndices( - clusterStateWithReplicatedClosedIndex(indexMetadata, false, true)); - assertThat(indices.size(), equalTo(1)); - } - - public void testResolveStatesToBeWritten() throws WriteStateException { - Map indices = new HashMap<>(); - Set relevantIndices = new HashSet<>(); - - IndexMetadata removedIndex = createIndexMetadata("removed_index"); - indices.put(removedIndex.getIndex(), 1L); - - IndexMetadata versionChangedIndex = createIndexMetadata("version_changed_index"); - indices.put(versionChangedIndex.getIndex(), 2L); - relevantIndices.add(versionChangedIndex.getIndex()); - - IndexMetadata notChangedIndex = createIndexMetadata("not_changed_index"); - indices.put(notChangedIndex.getIndex(), 3L); - relevantIndices.add(notChangedIndex.getIndex()); - - IndexMetadata newIndex = createIndexMetadata("new_index"); - relevantIndices.add(newIndex.getIndex()); - - Metadata oldMetadata = Metadata.builder() - .put(removedIndex, false) - .put(versionChangedIndex, false) - .put(notChangedIndex, false) - .build(); - - Metadata newMetadata = Metadata.builder() - .put(versionChangedIndex, true) - .put(notChangedIndex, false) - .put(newIndex, false) - .build(); - - IndexMetadata newVersionChangedIndex = newMetadata.index(versionChangedIndex.getIndex()); - - List actions = - IncrementalClusterStateWriter.resolveIndexMetadataActions(indices, relevantIndices, oldMetadata, newMetadata); - - assertThat(actions, hasSize(3)); - - boolean keptPreviousGeneration = false; - boolean wroteNewIndex = false; - boolean wroteChangedIndex = false; - - for (IncrementalClusterStateWriter.IndexMetadataAction action : actions) { - if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) { - assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex())); - IncrementalClusterStateWriter.AtomicClusterStateWriter writer - = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); - assertThat(action.execute(writer), equalTo(3L)); - verify(writer, times(1)).incrementIndicesSkipped(); - verifyNoMoreInteractions(writer); - keptPreviousGeneration = true; - } - if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetadata) { - assertThat(action.getIndex(), equalTo(newIndex.getIndex())); - IncrementalClusterStateWriter.AtomicClusterStateWriter writer - = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); - when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L); - assertThat(action.execute(writer), equalTo(0L)); - verify(writer, times(1)).incrementIndicesWritten(); - wroteNewIndex = true; - } - if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetadata) { - assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex())); - IncrementalClusterStateWriter.AtomicClusterStateWriter writer - = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); - when(writer.writeIndex(anyString(), eq(newVersionChangedIndex))).thenReturn(3L); - assertThat(action.execute(writer), equalTo(3L)); - ArgumentCaptor reason = ArgumentCaptor.forClass(String.class); - verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex)); - verify(writer, times(1)).incrementIndicesWritten(); - assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion()))); - assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion()))); - wroteChangedIndex = true; - } - } - - assertTrue(keptPreviousGeneration); - assertTrue(wroteNewIndex); - assertTrue(wroteChangedIndex); - } - - private static class MetaStateServiceWithFailures extends MetaStateService { - private final int invertedFailRate; - private boolean failRandomly; - - private MetadataStateFormat wrap(MetadataStateFormat format) { - return new MetadataStateFormat(format.getPrefix()) { - @Override - public void toXContent(XContentBuilder builder, T state) throws IOException { - format.toXContent(builder, state); - } - - @Override - public T fromXContent(XContentParser parser) throws IOException { - return format.fromXContent(parser); - } - - @Override - protected Directory newDirectory(Path dir) { - MockDirectoryWrapper mock = newMockFSDirectory(dir); - if (failRandomly) { - MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { - @Override - public void eval(MockDirectoryWrapper dir) throws IOException { - int r = randomIntBetween(0, invertedFailRate); - if (r == 0) { - throw new MockDirectoryWrapper.FakeIOException(); - } - } - }; - mock.failOn(fail); - } - closeAfterSuite(mock); - return mock; - } - }; - } - - MetaStateServiceWithFailures(int invertedFailRate, NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) { - super(nodeEnv, namedXContentRegistry); - METADATA_FORMAT = wrap(Metadata.FORMAT); - INDEX_METADATA_FORMAT = wrap(IndexMetadata.FORMAT); - MANIFEST_FORMAT = wrap(Manifest.FORMAT); - failRandomly = false; - this.invertedFailRate = invertedFailRate; - } - - void failRandomly() { - failRandomly = true; - } - - void noFailures() { - failRandomly = false; - } - } - - private boolean metadataEquals(Metadata md1, Metadata md2) { - boolean equals = Metadata.isGlobalStateEquals(md1, md2); - - for (IndexMetadata imd : md1) { - IndexMetadata imd2 = md2.index(imd.getIndex()); - equals = equals && imd.equals(imd2); - } - - for (IndexMetadata imd : md2) { - IndexMetadata imd2 = md1.index(imd.getIndex()); - equals = equals && imd.equals(imd2); - } - return equals; - } - - private static Metadata randomMetadataForTx() { - int settingNo = randomIntBetween(0, 10); - Metadata.Builder builder = Metadata.builder() - .persistentSettings(Settings.builder().put("setting" + settingNo, randomAlphaOfLength(5)).build()); - int numOfIndices = randomIntBetween(0, 3); - - for (int i = 0; i < numOfIndices; i++) { - int indexNo = randomIntBetween(0, 50); - IndexMetadata indexMetadata = IndexMetadata.builder("index" + indexNo).settings( - Settings.builder() - .put(IndexMetadata.SETTING_INDEX_UUID, "index" + indexNo) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .build() - ).build(); - builder.put(indexMetadata, false); - } - return builder.build(); - } - - public void testAtomicityWithFailures() throws IOException { - try (NodeEnvironment env = newNodeEnvironment()) { - MetaStateServiceWithFailures metaStateService = - new MetaStateServiceWithFailures(randomIntBetween(100, 1000), env, xContentRegistry()); - - // We only guarantee atomicity of writes, if there is initial Manifest file - Manifest manifest = Manifest.empty(); - Metadata metadata = Metadata.EMPTY_METADATA; - metaStateService.writeManifestAndCleanup("startup", Manifest.empty()); - long currentTerm = randomNonNegativeLong(); - long clusterStateVersion = randomNonNegativeLong(); - - metaStateService.failRandomly(); - Set possibleMetadata = new HashSet<>(); - possibleMetadata.add(metadata); - - for (int i = 0; i < randomIntBetween(1, 5); i++) { - IncrementalClusterStateWriter.AtomicClusterStateWriter writer = - new IncrementalClusterStateWriter.AtomicClusterStateWriter(metaStateService, manifest); - metadata = randomMetadataForTx(); - Map indexGenerations = new HashMap<>(); - - try { - long globalGeneration = writer.writeGlobalState("global", metadata); - - for (IndexMetadata indexMetadata : metadata) { - long generation = writer.writeIndex("index", indexMetadata); - indexGenerations.put(indexMetadata.getIndex(), generation); - } - - Manifest newManifest = new Manifest(currentTerm, clusterStateVersion, globalGeneration, indexGenerations); - writer.writeManifestAndCleanup("manifest", newManifest); - possibleMetadata.clear(); - possibleMetadata.add(metadata); - manifest = newManifest; - } catch (WriteStateException e) { - if (e.isDirty()) { - possibleMetadata.add(metadata); - /* - * If dirty WriteStateException occurred, it's only safe to proceed if there is subsequent - * successful write of metadata and Manifest. We prefer to break here, not to over complicate test logic. - * See also MetadataStateFormat#testFailRandomlyAndReadAnyState, that does not break. - */ - break; - } - } - } - - metaStateService.noFailures(); - - Tuple manifestAndMetadata = metaStateService.loadFullState(); - Metadata loadedMetadata = manifestAndMetadata.v2(); - - assertTrue(possibleMetadata.stream().anyMatch(md -> metadataEquals(md, loadedMetadata))); - } - } - - @TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level") - public void testSlowLogging() throws WriteStateException, IllegalAccessException { - final long slowWriteLoggingThresholdMillis; - final Settings settings; - if (randomBoolean()) { - slowWriteLoggingThresholdMillis = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis(); - settings = Settings.EMPTY; - } else { - slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000); - settings = Settings.builder() - .put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms") - .build(); - } - - final DiscoveryNode localNode = newNode("node"); - final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); - - final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10); - final AtomicLong currentTime = new AtomicLong(startTimeMillis); - final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis); - - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - final IncrementalClusterStateWriter incrementalClusterStateWriter - = new IncrementalClusterStateWriter(settings, clusterSettings, mock(MetaStateService.class), - new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptyMap()), - clusterState, () -> currentTime.getAndAdd(writeDurationMillis.get())); - - assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( - "should see warning at threshold", - IncrementalClusterStateWriter.class.getCanonicalName(), - Level.WARN, - "writing cluster state took [*] which is above the warn threshold of [*]; " + - "wrote metadata for [0] indices and skipped [0] unchanged indices")); - - writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2)); - assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( - "should see warning above threshold", - IncrementalClusterStateWriter.class.getCanonicalName(), - Level.WARN, - "writing cluster state took [*] which is above the warn threshold of [*]; " + - "wrote metadata for [0] indices and skipped [0] unchanged indices")); - - writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1)); - assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.UnseenEventExpectation( - "should not see warning below threshold", - IncrementalClusterStateWriter.class.getCanonicalName(), - Level.WARN, - "*")); - - clusterSettings.applySettings(Settings.builder() - .put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms") - .build()); - assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( - "should see warning at reduced threshold", - IncrementalClusterStateWriter.class.getCanonicalName(), - Level.WARN, - "writing cluster state took [*] which is above the warn threshold of [*]; " + - "wrote metadata for [0] indices and skipped [0] unchanged indices")); - - assertThat(currentTime.get(), lessThan(startTimeMillis + 10 * slowWriteLoggingThresholdMillis)); // ensure no overflow - } - - private void assertExpectedLogs(ClusterState clusterState, IncrementalClusterStateWriter incrementalClusterStateWriter, - MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, WriteStateException { - MockLogAppender mockAppender = new MockLogAppender(); - mockAppender.start(); - mockAppender.addExpectation(expectation); - Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class); - Loggers.addAppender(classLogger, mockAppender); - - try { - incrementalClusterStateWriter.updateClusterState(clusterState); - } finally { - Loggers.removeAppender(classLogger, mockAppender); - mockAppender.stop(); - } - mockAppender.assertAllExpectationsMatched(); - } -} diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java index 1050c35e9a797..e25c747a49226 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java @@ -56,7 +56,7 @@ private static IndexMetadata indexMetadata(String name) { public void testWriteLoadIndex() throws Exception { IndexMetadata index = indexMetadata("test1"); - metaStateService.writeIndex("test_write", index); + MetaStateWriterUtils.writeIndex(env, "test_write", index); assertThat(metaStateService.loadIndexState(index.getIndex()), equalTo(index)); } @@ -68,7 +68,7 @@ public void testWriteLoadGlobal() throws Exception { Metadata metadata = Metadata.builder() .persistentSettings(Settings.builder().put("test1", "value1").build()) .build(); - metaStateService.writeGlobalState("test_write", metadata); + MetaStateWriterUtils.writeGlobalState(env, "test_write", metadata); assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metadata.persistentSettings())); } @@ -79,7 +79,7 @@ public void testWriteGlobalStateWithIndexAndNoIndexIsLoaded() throws Exception { IndexMetadata index = indexMetadata("test1"); Metadata metadataWithIndex = Metadata.builder(metadata).put(index, true).build(); - metaStateService.writeGlobalState("test_write", metadataWithIndex); + MetaStateWriterUtils.writeGlobalState(env, "test_write", metadataWithIndex); assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metadata.persistentSettings())); assertThat(metaStateService.loadGlobalState().hasIndex("test1"), equalTo(false)); } @@ -91,8 +91,8 @@ public void testLoadFullStateBWC() throws Exception { .put(indexMetadata, true) .build(); - long globalGeneration = metaStateService.writeGlobalState("test_write", metadata); - long indexGeneration = metaStateService.writeIndex("test_write", indexMetadata); + long globalGeneration = MetaStateWriterUtils.writeGlobalState(env, "test_write", metadata); + long indexGeneration = MetaStateWriterUtils.writeIndex(env, "test_write", indexMetadata); Tuple manifestAndMetadata = metaStateService.loadFullState(); Manifest manifest = manifestAndMetadata.v1(); @@ -118,7 +118,7 @@ public void testLoadEmptyStateNoManifest() throws IOException { public void testLoadEmptyStateWithManifest() throws IOException { Manifest manifest = Manifest.empty(); - metaStateService.writeManifestAndCleanup("test", manifest); + MetaStateWriterUtils.writeManifestAndCleanup(env, "test", manifest); Tuple manifestAndMetadata = metaStateService.loadFullState(); assertTrue(manifestAndMetadata.v1().isEmpty()); @@ -128,13 +128,13 @@ public void testLoadEmptyStateWithManifest() throws IOException { public void testLoadFullStateMissingGlobalMetadata() throws IOException { IndexMetadata index = indexMetadata("test1"); - long indexGeneration = metaStateService.writeIndex("test", index); + long indexGeneration = MetaStateWriterUtils.writeIndex(env, "test", index); Manifest manifest = new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), Manifest.empty().getGlobalGeneration(), new HashMap() {{ put(index.getIndex(), indexGeneration); }}); assertTrue(manifest.isGlobalGenerationMissing()); - metaStateService.writeManifestAndCleanup("test", manifest); + MetaStateWriterUtils.writeManifestAndCleanup(env, "test", manifest); Tuple manifestAndMetadata = metaStateService.loadFullState(); assertThat(manifestAndMetadata.v1(), equalTo(manifest)); @@ -151,20 +151,20 @@ public void testLoadFullStateAndUpdateAndClean() throws IOException { .put(index, true) .build(); - long globalGeneration = metaStateService.writeGlobalState("first global state write", metadata); - long indexGeneration = metaStateService.writeIndex("first index state write", index); + long globalGeneration = MetaStateWriterUtils.writeGlobalState(env, "first global state write", metadata); + long indexGeneration = MetaStateWriterUtils.writeIndex(env, "first index state write", index); Manifest manifest = new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), globalGeneration, new HashMap() {{ put(index.getIndex(), indexGeneration); }}); - metaStateService.writeManifestAndCleanup("first manifest write", manifest); + MetaStateWriterUtils.writeManifestAndCleanup(env, "first manifest write", manifest); Metadata newMetadata = Metadata.builder() .persistentSettings(Settings.builder().put("test1", "value2").build()) .put(index, true) .build(); - globalGeneration = metaStateService.writeGlobalState("second global state write", newMetadata); + globalGeneration = MetaStateWriterUtils.writeGlobalState(env, "second global state write", newMetadata); Tuple manifestAndMetadata = metaStateService.loadFullState(); assertThat(manifestAndMetadata.v1(), equalTo(manifest)); @@ -179,9 +179,9 @@ public void testLoadFullStateAndUpdateAndClean() throws IOException { put(index.getIndex(), indexGeneration); }}); - metaStateService.writeManifestAndCleanup("second manifest write", manifest); - metaStateService.cleanupGlobalState(globalGeneration); - metaStateService.cleanupIndex(index.getIndex(), indexGeneration); + MetaStateWriterUtils.writeManifestAndCleanup(env, "second manifest write", manifest); + Metadata.FORMAT.cleanupOldFiles(globalGeneration, env.nodeDataPaths()); + IndexMetadata.FORMAT.cleanupOldFiles(indexGeneration, env.indexPaths(index.getIndex())); manifestAndMetadata = metaStateService.loadFullState(); assertThat(manifestAndMetadata.v1(), equalTo(manifest)); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index 441063c1a0fdf..080382e7aecdd 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -31,8 +31,8 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.gateway.MetaStateWriterUtils; import org.elasticsearch.gateway.LocalAllocateDangledIndices; -import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; @@ -326,7 +326,6 @@ public void testVerifyIfIndexContentDeleted() throws Exception { final Index index = new Index("test", UUIDs.randomBase64UUID()); final IndicesService indicesService = getIndicesService(); final NodeEnvironment nodeEnv = getNodeEnvironment(); - final MetaStateService metaStateService = getInstanceFromNode(MetaStateService.class); final ClusterService clusterService = getInstanceFromNode(ClusterService.class); final Settings idxSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) @@ -337,7 +336,8 @@ public void testVerifyIfIndexContentDeleted() throws Exception { .numberOfShards(1) .numberOfReplicas(0) .build(); - metaStateService.writeIndex("test index being created", indexMetadata); + + MetaStateWriterUtils.writeIndex(nodeEnv, "test index being created", indexMetadata); final Metadata metadata = Metadata.builder(clusterService.state().metadata()).put(indexMetadata, true).build(); final ClusterState csWithIndex = new ClusterState.Builder(clusterService.state()).metadata(metadata).build(); try { diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MetaStateWriterUtils.java b/test/framework/src/main/java/org/elasticsearch/gateway/MetaStateWriterUtils.java new file mode 100644 index 0000000000000..a6d1a7f0ec388 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MetaStateWriterUtils.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.gateway; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Manifest; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; + +/** + * Maintains the method of writing cluster states to disk for versions prior to {@link Version#V_7_6_0}, preserved to test the classes that + * read this state during an upgrade from these older versions. + */ +public class MetaStateWriterUtils { + private static final Logger logger = LogManager.getLogger(MetaStateWriterUtils.class); + + private MetaStateWriterUtils() { + throw new AssertionError("static methods only"); + } + + /** + * Writes manifest file (represented by {@link Manifest}) to disk and performs cleanup of old manifest state file if + * the write succeeds or newly created manifest state if the write fails. + * + * @throws WriteStateException if exception when writing state occurs. See also {@link WriteStateException#isDirty()} + */ + public static void writeManifestAndCleanup(NodeEnvironment nodeEnv, String reason, Manifest manifest) throws WriteStateException { + logger.trace("[_meta] writing state, reason [{}]", reason); + try { + long generation = Manifest.FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths()); + logger.trace("[_meta] state written (generation: {})", generation); + } catch (WriteStateException ex) { + throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex); + } + } + + /** + * Writes the index state. + *

+ * This method is public for testing purposes. + * + * @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return + * false, because new index state file is not yet referenced by manifest file. + */ + public static long writeIndex(NodeEnvironment nodeEnv, String reason, IndexMetadata indexMetadata) throws WriteStateException { + final Index index = indexMetadata.getIndex(); + logger.trace("[{}] writing state, reason [{}]", index, reason); + try { + long generation = IndexMetadata.FORMAT.write(indexMetadata, + nodeEnv.indexPaths(indexMetadata.getIndex())); + logger.trace("[{}] state written", index); + return generation; + } catch (WriteStateException ex) { + throw new WriteStateException(false, "[" + index + "]: failed to write index state", ex); + } + } + + /** + * Writes the global state, *without* the indices states. + * + * @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return + * false, because new global state file is not yet referenced by manifest file. + */ + static long writeGlobalState(NodeEnvironment nodeEnv, String reason, Metadata metadata) throws WriteStateException { + logger.trace("[_global] writing state, reason [{}]", reason); + try { + long generation = Metadata.FORMAT.write(metadata, nodeEnv.nodeDataPaths()); + logger.trace("[_global] state written"); + return generation; + } catch (WriteStateException ex) { + throw new WriteStateException(false, "[_global]: failed to write global state", ex); + } + } + +}