diff --git a/qa/os/src/test/java/org/elasticsearch/packaging/test/ArchiveTests.java b/qa/os/src/test/java/org/elasticsearch/packaging/test/ArchiveTests.java index 041b1791ee4f4..9691636f5ccda 100644 --- a/qa/os/src/test/java/org/elasticsearch/packaging/test/ArchiveTests.java +++ b/qa/os/src/test/java/org/elasticsearch/packaging/test/ArchiveTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.packaging.util.ServerUtils; import org.elasticsearch.packaging.util.Shell.Result; import org.junit.BeforeClass; -import org.junit.Ignore; import java.nio.file.Files; import java.nio.file.Path; @@ -383,7 +382,6 @@ public void test92ElasticsearchNodeCliPackaging() throws Exception { } } - @Ignore("https://github.com/elastic/elasticsearch/issues/48701") // TODO unsafe bootstrapping public void test93ElasticsearchNodeCustomDataPathAndNotEsHomeWorkDir() throws Exception { Path relativeDataPath = installation.data.relativize(installation.home); append(installation.config("elasticsearch.yml"), "path.data: " + relativeDataPath); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java index dff7ae5a2ee03..527a1b3f69cb3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java @@ -18,11 +18,12 @@ */ package org.elasticsearch.cluster.coordination; +import joptsimple.OptionSet; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.cluster.metadata.Manifest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.env.Environment; +import org.elasticsearch.gateway.PersistedClusterStateService; import java.io.IOException; import java.nio.file.Path; @@ -48,14 +49,21 @@ public DetachClusterCommand() { @Override - protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException { - final Tuple manifestMetaDataTuple = loadMetaData(terminal, dataPaths); - final Manifest manifest = manifestMetaDataTuple.v1(); - final MetaData metaData = manifestMetaDataTuple.v2(); + protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException { + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + + terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state"); + final ClusterState oldClusterState = loadTermAndClusterState(persistedClusterStateService, env).v2(); + final ClusterState newClusterState = ClusterState.builder(oldClusterState) + .metaData(updateMetaData(oldClusterState.metaData())).build(); + terminal.println(Terminal.Verbosity.VERBOSE, + "[old cluster state = " + oldClusterState + ", new cluster state = " + newClusterState + "]"); confirm(terminal, CONFIRMATION_MSG); - writeNewMetaData(terminal, manifest, updateCurrentTerm(), metaData, updateMetaData(metaData), dataPaths); + try (PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(updateCurrentTerm(), newClusterState); + } terminal.println(NODE_DETACHED_MSG); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java index 800269520e366..119edf32f1ddb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java @@ -26,41 +26,77 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cli.EnvironmentAwareCommand; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.cluster.metadata.Manifest; -import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeMetaData; +import org.elasticsearch.gateway.PersistedClusterStateService; +import org.elasticsearch.indices.IndicesModule; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand { private static final Logger logger = LogManager.getLogger(ElasticsearchNodeCommand.class); protected static final String DELIMITER = "------------------------------------------------------------------------\n"; - static final String STOP_WARNING_MSG = DELIMITER + "\n" + " WARNING: Elasticsearch MUST be stopped before running this tool." + "\n"; protected static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = "failed to lock node's directory, is Elasticsearch still running?"; - static final String NO_NODE_FOLDER_FOUND_MSG = "no node folder is found in data folder(s), node has not been started yet?"; - static final String NO_MANIFEST_FILE_FOUND_MSG = "no manifest file is found, do you run pre 7.0 Elasticsearch?"; - protected static final String GLOBAL_GENERATION_MISSING_MSG = - "no metadata is referenced from the manifest file, cluster has never been bootstrapped?"; - static final String NO_GLOBAL_METADATA_MSG = "failed to find global metadata, metadata corrupted?"; - static final String WRITE_METADATA_EXCEPTION_MSG = "exception occurred when writing new metadata to disk"; protected static final String ABORTED_BY_USER_MSG = "aborted by user"; + static final String NO_NODE_FOLDER_FOUND_MSG = "no node folder is found in data folder(s), node has not been started yet?"; + static final String NO_NODE_METADATA_FOUND_MSG = "no node meta data is found, node has not been started yet?"; + protected static final String CS_MISSING_MSG = + "cluster state is empty, cluster has never been bootstrapped?"; + + protected static final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry( + Stream.of(ClusterModule.getNamedXWriteables().stream(), IndicesModule.getNamedXContents().stream()) + .flatMap(Function.identity()) + .collect(Collectors.toList())); public ElasticsearchNodeCommand(String description) { super(description); } + public static PersistedClusterStateService createPersistedClusterStateService(Path[] dataPaths) throws IOException { + final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths); + if (nodeMetaData == null) { + throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG); + } + + String nodeId = nodeMetaData.nodeId(); + return new PersistedClusterStateService(dataPaths, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, true); + } + + public static ClusterState clusterState(Environment environment, PersistedClusterStateService.OnDiskState onDiskState) { + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(environment.settings())) + .version(onDiskState.lastAcceptedVersion) + .metaData(onDiskState.metaData) + .build(); + } + + public static Tuple loadTermAndClusterState(PersistedClusterStateService psf, + Environment env) throws IOException { + final PersistedClusterStateService.OnDiskState bestOnDiskState = psf.loadBestOnDiskState(); + if (bestOnDiskState.empty()) { + throw new ElasticsearchException(CS_MISSING_MSG); + } + return Tuple.tuple(bestOnDiskState.currentTerm, clusterState(env, bestOnDiskState)); + } + protected void processNodePaths(Terminal terminal, OptionSet options, Environment env) throws IOException { terminal.println(Terminal.Verbosity.VERBOSE, "Obtaining lock for node"); try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, env, Files::exists)) { @@ -69,32 +105,12 @@ protected void processNodePaths(Terminal terminal, OptionSet options, Environmen if (dataPaths.length == 0) { throw new ElasticsearchException(NO_NODE_FOLDER_FOUND_MSG); } - processNodePaths(terminal, dataPaths, env); + processNodePaths(terminal, dataPaths, options, env); } catch (LockObtainFailedException e) { throw new ElasticsearchException(FAILED_TO_OBTAIN_NODE_LOCK_MSG, e); } } - protected Tuple loadMetaData(Terminal terminal, Path[] dataPaths) throws IOException { - terminal.println(Terminal.Verbosity.VERBOSE, "Loading manifest file"); - final Manifest manifest = Manifest.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths); - - if (manifest == null) { - throw new ElasticsearchException(NO_MANIFEST_FILE_FOUND_MSG); - } - if (manifest.isGlobalGenerationMissing()) { - throw new ElasticsearchException(GLOBAL_GENERATION_MISSING_MSG); - } - terminal.println(Terminal.Verbosity.VERBOSE, "Loading global metadata file"); - final MetaData metaData = MetaData.FORMAT_PRESERVE_CUSTOMS.loadGeneration( - logger, NamedXContentRegistry.EMPTY, manifest.getGlobalGeneration(), dataPaths); - if (metaData == null) { - throw new ElasticsearchException(NO_GLOBAL_METADATA_MSG + " [generation = " + manifest.getGlobalGeneration() + "]"); - } - - return Tuple.tuple(manifest, metaData); - } - protected void confirm(Terminal terminal, String msg) { terminal.println(msg); String text = terminal.readText("Confirm [y/N] "); @@ -104,7 +120,7 @@ protected void confirm(Terminal terminal, String msg) { } @Override - protected final void execute(Terminal terminal, OptionSet options, Environment env) throws Exception { + public final void execute(Terminal terminal, OptionSet options, Environment env) throws Exception { terminal.println(STOP_WARNING_MSG); if (validateBeforeLock(terminal, env)) { processNodePaths(terminal, options, env); @@ -126,44 +142,10 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) { * Process the paths. Locks for the paths is held during this method invocation. * @param terminal the terminal to use for messages * @param dataPaths the paths of the node to process + * @param options the command line options * @param env the env of the node to process */ - protected abstract void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException; - - - protected void writeNewMetaData(Terminal terminal, Manifest oldManifest, long newCurrentTerm, - MetaData oldMetaData, MetaData newMetaData, Path[] dataPaths) { - long newGeneration; - try { - terminal.println(Terminal.Verbosity.VERBOSE, - "[clusterUUID = " + oldMetaData.clusterUUID() + ", committed = " + oldMetaData.clusterUUIDCommitted() + "] => " + - "[clusterUUID = " + newMetaData.clusterUUID() + ", committed = " + newMetaData.clusterUUIDCommitted() + "]"); - terminal.println(Terminal.Verbosity.VERBOSE, "New coordination metadata is " + newMetaData.coordinationMetaData()); - terminal.println(Terminal.Verbosity.VERBOSE, "Writing new global metadata to disk"); - newGeneration = MetaData.FORMAT.write(newMetaData, dataPaths); - Manifest newManifest = new Manifest(newCurrentTerm, oldManifest.getClusterStateVersion(), newGeneration, - oldManifest.getIndexGenerations()); - terminal.println(Terminal.Verbosity.VERBOSE, "New manifest is " + newManifest); - terminal.println(Terminal.Verbosity.VERBOSE, "Writing new manifest file to disk"); - Manifest.FORMAT.writeAndCleanup(newManifest, dataPaths); - } catch (Exception e) { - terminal.println(Terminal.Verbosity.VERBOSE, "Cleaning up new metadata"); - MetaData.FORMAT.cleanupOldFiles(oldManifest.getGlobalGeneration(), dataPaths); - throw new ElasticsearchException(WRITE_METADATA_EXCEPTION_MSG, e); - } - // if cleaning old files fail, we still succeeded. - try { - cleanUpOldMetaData(terminal, dataPaths, newGeneration); - } catch (Exception e) { - terminal.println(Terminal.Verbosity.SILENT, - "Warning: Cleaning up old metadata failed, but operation was otherwise successful (message: " + e.getMessage() + ")"); - } - } - - protected void cleanUpOldMetaData(Terminal terminal, Path[] dataPaths, long newGeneration) { - terminal.println(Terminal.Verbosity.VERBOSE, "Cleaning up old metadata"); - MetaData.FORMAT.cleanupOldFiles(newGeneration, dataPaths); - } + protected abstract void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException; protected NodeEnvironment.NodePath[] toNodePaths(Path[] dataPaths) { return Arrays.stream(dataPaths).map(ElasticsearchNodeCommand::createNodePath).toArray(NodeEnvironment.NodePath[]::new); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java index 05bc0116c13c6..2b9e7db1e98bd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java @@ -18,19 +18,17 @@ */ package org.elasticsearch.cluster.coordination; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import joptsimple.OptionSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.cluster.metadata.Manifest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; -import org.elasticsearch.env.NodeMetaData; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.node.Node; import java.io.IOException; @@ -40,8 +38,6 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand { - private static final Logger logger = LogManager.getLogger(UnsafeBootstrapMasterCommand.class); - static final String CLUSTER_STATE_TERM_VERSION_MSG_FORMAT = "Current node cluster state (term, version) pair is (%s, %s)"; static final String CONFIRMATION_MSG = @@ -58,8 +54,6 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand { static final String NOT_MASTER_NODE_MSG = "unsafe-bootstrap tool can only be run on master eligible node"; - static final String NO_NODE_METADATA_FOUND_MSG = "no node meta data is found, node has not been started yet?"; - static final String EMPTY_LAST_COMMITTED_VOTING_CONFIG_MSG = "last committed voting voting configuration is empty, cluster has never been bootstrapped?"; @@ -83,49 +77,54 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) { return true; } - protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException { - terminal.println(Terminal.Verbosity.VERBOSE, "Loading node metadata"); - final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths); - if (nodeMetaData == null) { - throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG); - } + protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException { + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); - String nodeId = nodeMetaData.nodeId(); - terminal.println(Terminal.Verbosity.VERBOSE, "Current nodeId is " + nodeId); + final Tuple state = loadTermAndClusterState(persistedClusterStateService, env); + final ClusterState oldClusterState = state.v2(); + + final MetaData metaData = oldClusterState.metaData(); - final Tuple manifestMetaDataTuple = loadMetaData(terminal, dataPaths); - final Manifest manifest = manifestMetaDataTuple.v1(); - final MetaData metaData = manifestMetaDataTuple.v2(); final CoordinationMetaData coordinationMetaData = metaData.coordinationMetaData(); if (coordinationMetaData == null || - coordinationMetaData.getLastCommittedConfiguration() == null || - coordinationMetaData.getLastCommittedConfiguration().isEmpty()) { + coordinationMetaData.getLastCommittedConfiguration() == null || + coordinationMetaData.getLastCommittedConfiguration().isEmpty()) { throw new ElasticsearchException(EMPTY_LAST_COMMITTED_VOTING_CONFIG_MSG); } terminal.println(String.format(Locale.ROOT, CLUSTER_STATE_TERM_VERSION_MSG_FORMAT, coordinationMetaData.term(), - metaData.version())); - - confirm(terminal, CONFIRMATION_MSG); + metaData.version())); CoordinationMetaData newCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData) - .clearVotingConfigExclusions() - .lastAcceptedConfiguration(new CoordinationMetaData.VotingConfiguration(Collections.singleton(nodeId))) - .lastCommittedConfiguration(new CoordinationMetaData.VotingConfiguration(Collections.singleton(nodeId))) - .build(); + .clearVotingConfigExclusions() + .lastAcceptedConfiguration(new CoordinationMetaData.VotingConfiguration( + Collections.singleton(persistedClusterStateService.getNodeId()))) + .lastCommittedConfiguration(new CoordinationMetaData.VotingConfiguration( + Collections.singleton(persistedClusterStateService.getNodeId()))) + .build(); Settings persistentSettings = Settings.builder() - .put(metaData.persistentSettings()) - .put(UNSAFE_BOOTSTRAP.getKey(), true) - .build(); + .put(metaData.persistentSettings()) + .put(UNSAFE_BOOTSTRAP.getKey(), true) + .build(); MetaData newMetaData = MetaData.builder(metaData) - .clusterUUID(MetaData.UNKNOWN_CLUSTER_UUID) - .generateClusterUuidIfNeeded() - .clusterUUIDCommitted(true) - .persistentSettings(persistentSettings) - .coordinationMetaData(newCoordinationMetaData) - .build(); - - writeNewMetaData(terminal, manifest, manifest.getCurrentTerm(), metaData, newMetaData, dataPaths); + .clusterUUID(MetaData.UNKNOWN_CLUSTER_UUID) + .generateClusterUuidIfNeeded() + .clusterUUIDCommitted(true) + .persistentSettings(persistentSettings) + .coordinationMetaData(newCoordinationMetaData) + .build(); + + final ClusterState newClusterState = ClusterState.builder(oldClusterState) + .metaData(newMetaData).build(); + + terminal.println(Terminal.Verbosity.VERBOSE, + "[old cluster state = " + oldClusterState + ", new cluster state = " + newClusterState + "]"); + + confirm(terminal, CONFIRMATION_MSG); + + try (PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(state.v1(), newClusterState); + } terminal.println(MASTER_NODE_BOOTSTRAPPED_MSG); } diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 2a4512261c385..fb0422e1182b8 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -50,7 +50,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.gateway.LucenePersistedStateFactory; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -388,7 +388,7 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings MetaDataStateFormat.STATE_DIR_NAME, // Lucene-based metadata folder - LucenePersistedStateFactory.METADATA_DIRECTORY_NAME, + PersistedClusterStateService.METADATA_DIRECTORY_NAME, // indices INDICES_FOLDER)); diff --git a/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java b/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java index 25b4f79866eaa..90629a486125d 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java +++ b/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java @@ -18,42 +18,41 @@ */ package org.elasticsearch.env; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import joptsimple.OptionParser; import joptsimple.OptionSet; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cli.Terminal; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.Manifest; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.gateway.WriteStateException; +import org.elasticsearch.gateway.PersistedClusterStateService; +import org.elasticsearch.gateway.MetaDataStateFormat; import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; -public class NodeRepurposeCommand extends ElasticsearchNodeCommand { +import static org.elasticsearch.env.NodeEnvironment.INDICES_FOLDER; - private static final Logger logger = LogManager.getLogger(NodeRepurposeCommand.class); +public class NodeRepurposeCommand extends ElasticsearchNodeCommand { static final String ABORTED_BY_USER_MSG = ElasticsearchNodeCommand.ABORTED_BY_USER_MSG; static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = ElasticsearchNodeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG; static final String NO_CLEANUP = "Node has node.data=true -> no clean up necessary"; static final String NO_DATA_TO_CLEAN_UP_FOUND = "No data to clean-up found"; static final String NO_SHARD_DATA_TO_CLEAN_UP_FOUND = "No shard data to clean-up found"; - static final String PRE_V7_MESSAGE = - "No manifest file found. If you were previously running this node on Elasticsearch version 6, please proceed.\n" + - "If this node was ever started on Elasticsearch version 7 or higher, it might mean metadata corruption, please abort."; public NodeRepurposeCommand() { super("Repurpose this node to another master/data role, cleaning up any excess persisted data"); @@ -75,17 +74,17 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) { } @Override - protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException { + protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException { assert DiscoveryNode.isDataNode(env.settings()) == false; if (DiscoveryNode.isMasterNode(env.settings()) == false) { - processNoMasterNoDataNode(terminal, dataPaths); + processNoMasterNoDataNode(terminal, dataPaths, env); } else { - processMasterNoDataNode(terminal, dataPaths); + processMasterNoDataNode(terminal, dataPaths, env); } } - private void processNoMasterNoDataNode(Terminal terminal, Path[] dataPaths) throws IOException { + private void processNoMasterNoDataNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException { NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths); terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths"); @@ -95,32 +94,38 @@ private void processNoMasterNoDataNode(Terminal terminal, Path[] dataPaths) thro List indexMetaDataPaths = NodeEnvironment.collectIndexMetaDataPaths(nodePaths); Set indexPaths = uniqueParentPaths(shardDataPaths, indexMetaDataPaths); - if (indexPaths.isEmpty()) { + + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + + final MetaData metaData = loadClusterState(terminal, env, persistedClusterStateService).metaData(); + if (indexPaths.isEmpty() && metaData.indices().isEmpty()) { terminal.println(Terminal.Verbosity.NORMAL, NO_DATA_TO_CLEAN_UP_FOUND); return; } - Set indexUUIDs = indexUUIDsFor(indexPaths); - outputVerboseInformation(terminal, nodePaths, indexPaths, indexUUIDs); + final Set indexUUIDs = Sets.union(indexUUIDsFor(indexPaths), + StreamSupport.stream(metaData.indices().values().spliterator(), false) + .map(imd -> imd.value.getIndexUUID()).collect(Collectors.toSet())); + + outputVerboseInformation(terminal, indexPaths, indexUUIDs, metaData); terminal.println(noMasterMessage(indexUUIDs.size(), shardDataPaths.size(), indexMetaDataPaths.size())); outputHowToSeeVerboseInformation(terminal); - final Manifest manifest = loadManifest(terminal, dataPaths); - terminal.println("Node is being re-purposed as no-master and no-data. Clean-up of index data will be performed."); confirm(terminal, "Do you want to proceed?"); - if (manifest != null) { - rewriteManifest(terminal, manifest, dataPaths); - } - - removePaths(terminal, indexPaths); + removePaths(terminal, indexPaths); // clean-up shard dirs + // clean-up all metadata dirs + IOUtils.rm(Stream.of(dataPaths).map(path -> path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)) + .toArray(Path[]::new)); + MetaDataStateFormat.deleteMetaState(dataPaths); + IOUtils.rm(Stream.of(dataPaths).map(path -> path.resolve(INDICES_FOLDER)).toArray(Path[]::new)); terminal.println("Node successfully repurposed to no-master and no-data."); } - private void processMasterNoDataNode(Terminal terminal, Path[] dataPaths) throws IOException { + private void processMasterNoDataNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException { NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths); terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths"); @@ -130,9 +135,14 @@ private void processMasterNoDataNode(Terminal terminal, Path[] dataPaths) throws return; } - Set indexPaths = uniqueParentPaths(shardDataPaths); - Set indexUUIDs = indexUUIDsFor(indexPaths); - outputVerboseInformation(terminal, nodePaths, shardDataPaths, indexUUIDs); + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + + final MetaData metaData = loadClusterState(terminal, env, persistedClusterStateService).metaData(); + + final Set indexPaths = uniqueParentPaths(shardDataPaths); + final Set indexUUIDs = indexUUIDsFor(indexPaths); + + outputVerboseInformation(terminal, shardDataPaths, indexUUIDs, metaData); terminal.println(shardMessage(shardDataPaths.size(), indexUUIDs.size())); outputHowToSeeVerboseInformation(terminal); @@ -140,18 +150,22 @@ private void processMasterNoDataNode(Terminal terminal, Path[] dataPaths) throws terminal.println("Node is being re-purposed as master and no-data. Clean-up of shard data will be performed."); confirm(terminal, "Do you want to proceed?"); - removePaths(terminal, shardDataPaths); + removePaths(terminal, shardDataPaths); // clean-up shard dirs terminal.println("Node successfully repurposed to master and no-data."); } - private void outputVerboseInformation(Terminal terminal, NodeEnvironment.NodePath[] nodePaths, - Collection pathsToCleanup, Set indexUUIDs) { + private ClusterState loadClusterState(Terminal terminal, Environment env, PersistedClusterStateService psf) throws IOException { + terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state"); + return clusterState(env, psf.loadBestOnDiskState()); + } + + private void outputVerboseInformation(Terminal terminal, Collection pathsToCleanup, Set indexUUIDs, MetaData metaData) { if (terminal.isPrintable(Terminal.Verbosity.VERBOSE)) { terminal.println(Terminal.Verbosity.VERBOSE, "Paths to clean up:"); pathsToCleanup.forEach(p -> terminal.println(Terminal.Verbosity.VERBOSE, " " + p.toString())); terminal.println(Terminal.Verbosity.VERBOSE, "Indices affected:"); - indexUUIDs.forEach(uuid -> terminal.println(Terminal.Verbosity.VERBOSE, " " + toIndexName(nodePaths, uuid))); + indexUUIDs.forEach(uuid -> terminal.println(Terminal.Verbosity.VERBOSE, " " + toIndexName(uuid, metaData))); } } @@ -160,17 +174,15 @@ private void outputHowToSeeVerboseInformation(Terminal terminal) { terminal.println("Use -v to see list of paths and indices affected"); } } - private String toIndexName(NodeEnvironment.NodePath[] nodePaths, String uuid) { - Path[] indexPaths = new Path[nodePaths.length]; - for (int i = 0; i < nodePaths.length; i++) { - indexPaths[i] = nodePaths[i].resolve(uuid); - } - try { - IndexMetaData metaData = IndexMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, indexPaths); - return metaData.getIndex().getName(); - } catch (Exception e) { - return "no name for uuid: " + uuid + ": " + e; + private String toIndexName(String uuid, MetaData metaData) { + if (metaData != null) { + for (ObjectObjectCursor indexMetaData : metaData.indices()) { + if (indexMetaData.value.getIndexUUID().equals(uuid)) { + return indexMetaData.value.getIndex().getName(); + } + } } + return "no name for uuid: " + uuid; } private Set indexUUIDsFor(Set indexPaths) { @@ -186,23 +198,6 @@ static String shardMessage(int shards, int indices) { return "Found " + shards + " shards in " + indices + " indices to clean up"; } - private void rewriteManifest(Terminal terminal, Manifest manifest, Path[] dataPaths) throws WriteStateException { - terminal.println(Terminal.Verbosity.VERBOSE, "Re-writing manifest"); - Manifest newManifest = new Manifest(manifest.getCurrentTerm(), manifest.getClusterStateVersion(), manifest.getGlobalGeneration(), - new HashMap<>()); - Manifest.FORMAT.writeAndCleanup(newManifest, dataPaths); - } - - private Manifest loadManifest(Terminal terminal, Path[] dataPaths) throws IOException { - terminal.println(Terminal.Verbosity.VERBOSE, "Loading manifest"); - final Manifest manifest = Manifest.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths); - - if (manifest == null) { - terminal.println(Terminal.Verbosity.SILENT, PRE_V7_MESSAGE); - } - return manifest; - } - private void removePaths(Terminal terminal, Collection paths) { terminal.println(Terminal.Verbosity.VERBOSE, "Removing data"); paths.forEach(this::removePath); diff --git a/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java b/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java index f50bdf081ef85..aa2235e2a252b 100644 --- a/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java +++ b/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java @@ -19,6 +19,7 @@ package org.elasticsearch.env; import joptsimple.OptionParser; +import joptsimple.OptionSet; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; @@ -72,7 +73,7 @@ public OverrideNodeVersionCommand() { } @Override - protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException { + protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException { final Path[] nodePaths = Arrays.stream(toNodePaths(dataPaths)).map(p -> p.path).toArray(Path[]::new); final NodeMetaData nodeMetaData = new NodeMetaData.NodeMetaDataStateFormat(true).loadLatestState(logger, NamedXContentRegistry.EMPTY, nodePaths); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index cfffc89fac3b1..bad9dfafe0f64 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -29,11 +29,13 @@ import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.plugins.MetaDataUpgrader; @@ -41,6 +43,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; import java.util.function.BiConsumer; @@ -72,19 +75,49 @@ public MetaData getMetaData() { } public void start(Settings settings, TransportService transportService, ClusterService clusterService, - MetaDataIndexUpgradeService metaDataIndexUpgradeService, - MetaDataUpgrader metaDataUpgrader, LucenePersistedStateFactory lucenePersistedStateFactory) { + MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, + MetaDataUpgrader metaDataUpgrader, PersistedClusterStateService persistedClusterStateService) { assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { try { - persistedState.set(lucenePersistedStateFactory.loadPersistedState((version, metadata) -> - prepareInitialClusterState(transportService, clusterService, + final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(); + + MetaData metaData = onDiskState.metaData; + long lastAcceptedVersion = onDiskState.lastAcceptedVersion; + long currentTerm = onDiskState.currentTerm; + + if (onDiskState.empty()) { + assert Version.CURRENT.major <= Version.V_7_0_0.major + 1 : + "legacy metadata loader is not needed anymore from v9 onwards"; + final Tuple legacyState = metaStateService.loadFullState(); + if (legacyState.v1().isEmpty() == false) { + metaData = legacyState.v2(); + lastAcceptedVersion = legacyState.v1().getClusterStateVersion(); + currentTerm = legacyState.v1().getCurrentTerm(); + } + } + + final PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter(); + final LucenePersistedState lucenePersistedState; + boolean success = false; + try { + final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) - .version(version) - .metaData(upgradeMetaDataForNode(metadata, metaDataIndexUpgradeService, metaDataUpgrader)) - .build())) - ); + .version(lastAcceptedVersion) + .metaData(upgradeMetaDataForNode(metaData, metaDataIndexUpgradeService, metaDataUpgrader)) + .build()); + lucenePersistedState = new LucenePersistedState( + persistenceWriter, currentTerm, clusterState); + metaStateService.deleteAll(); // delete legacy files + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(persistenceWriter); + } + } + + persistedState.set(lucenePersistedState); } catch (IOException e) { throw new ElasticsearchException("failed to load metadata", e); } @@ -167,4 +200,68 @@ public void close() throws IOException { IOUtils.close(persistedState.get()); } + /** + * Encapsulates the incremental writing of metadata to a {@link PersistedClusterStateService.Writer}. + */ + static class LucenePersistedState implements PersistedState { + + private long currentTerm; + private ClusterState lastAcceptedState; + private final PersistedClusterStateService.Writer persistenceWriter; + + LucenePersistedState(PersistedClusterStateService.Writer persistenceWriter, long currentTerm, ClusterState lastAcceptedState) + throws IOException { + this.persistenceWriter = persistenceWriter; + this.currentTerm = currentTerm; + this.lastAcceptedState = lastAcceptedState; + // Write the whole state out to be sure it's fresh and using the latest format. Called during initialisation, so that + // (1) throwing an IOException is enough to halt the node, and + // (2) the index is currently empty since it was opened with IndexWriterConfig.OpenMode.CREATE + + // In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance, + // this is true if there's only one data path on this master node, and the commit we just loaded was already written out + // by this version of Elasticsearch. TODO TBD should we avoid indexing when possible? + persistenceWriter.writeFullStateAndCommit(currentTerm, lastAcceptedState); + } + + @Override + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public ClusterState getLastAcceptedState() { + return lastAcceptedState; + } + + @Override + public void setCurrentTerm(long currentTerm) { + persistenceWriter.commit(currentTerm, lastAcceptedState.version()); + this.currentTerm = currentTerm; + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + try { + if (clusterState.term() != lastAcceptedState.term()) { + assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); + // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so + // it's simplest to write everything again. + persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState); + } else { + // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. + persistenceWriter.writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + lastAcceptedState = clusterState; + } + + @Override + public void close() throws IOException { + persistenceWriter.close(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/gateway/LucenePersistedStateFactory.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java similarity index 78% rename from server/src/main/java/org/elasticsearch/gateway/LucenePersistedStateFactory.java rename to server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 2e1e22ebf445a..505d430979b41 100644 --- a/server/src/main/java/org/elasticsearch/gateway/LucenePersistedStateFactory.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -48,12 +48,9 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.CheckedConsumer; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; @@ -72,7 +69,6 @@ import java.io.FilterOutputStream; import java.io.IOError; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -81,7 +77,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; import java.util.function.IntPredicate; /** @@ -109,8 +104,8 @@ * * (the last-accepted term is recorded in MetaData → CoordinationMetaData so does not need repeating here) */ -public class LucenePersistedStateFactory { - private static final Logger logger = LogManager.getLogger(LucenePersistedStateFactory.class); +public class PersistedClusterStateService { + private static final Logger logger = LogManager.getLogger(PersistedClusterStateService.class); private static final String CURRENT_TERM_KEY = "current_term"; private static final String LAST_ACCEPTED_VERSION_KEY = "last_accepted_version"; private static final String NODE_ID_KEY = "node_id"; @@ -124,58 +119,38 @@ public class LucenePersistedStateFactory { public static final String METADATA_DIRECTORY_NAME = "_metadata"; - private final NodeEnvironment nodeEnvironment; + private final Path[] dataPaths; + private final String nodeId; private final NamedXContentRegistry namedXContentRegistry; private final BigArrays bigArrays; - private final LegacyLoader legacyLoader; + private final boolean preserveUnknownCustoms; - /** - * Allows interacting with legacy metadata - */ - public interface LegacyLoader { - /** - * Loads legacy state - */ - Tuple loadClusterState() throws IOException; - - /** - * Cleans legacy state - */ - void clean() throws IOException; - } - - LucenePersistedStateFactory(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays) { - this(nodeEnvironment, namedXContentRegistry, bigArrays, new LegacyLoader() { - @Override - public Tuple loadClusterState() { - return new Tuple<>(Manifest.empty(), MetaData.EMPTY_META_DATA); - } - - @Override - public void clean() { - - } - }); + public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays) { + this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, false); } - public LucenePersistedStateFactory(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays, - LegacyLoader legacyLoader) { - this.nodeEnvironment = nodeEnvironment; + public PersistedClusterStateService(Path[] dataPaths, String nodeId, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays, + boolean preserveUnknownCustoms) { + this.dataPaths = dataPaths; + this.nodeId = nodeId; this.namedXContentRegistry = namedXContentRegistry; this.bigArrays = bigArrays; - this.legacyLoader = legacyLoader; + this.preserveUnknownCustoms = preserveUnknownCustoms; } - CoordinationState.PersistedState loadPersistedState(BiFunction clusterStateFromMetaData) - throws IOException { - - final OnDiskState onDiskState = loadBestOnDiskState(); + public String getNodeId() { + return nodeId; + } + /** + * Creates a new disk-based writer for cluster states + */ + public Writer createWriter() throws IOException { final List metaDataIndexWriters = new ArrayList<>(); final List closeables = new ArrayList<>(); boolean success = false; try { - for (final Path path : nodeEnvironment.nodeDataPaths()) { + for (final Path path : dataPaths) { final Directory directory = createDirectory(path.resolve(METADATA_DIRECTORY_NAME)); closeables.add(directory); @@ -199,21 +174,7 @@ CoordinationState.PersistedState loadPersistedState(BiFunction maxAcceptedTerm || (acceptedTerm == maxAcceptedTerm && (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion @@ -300,15 +268,6 @@ private OnDiskState loadBestOnDiskState() throws IOException { "] with greater term [" + maxCurrentTermOnDiskState.currentTerm + "]"); } - if (bestOnDiskState == NO_ON_DISK_STATE) { - assert Version.CURRENT.major <= Version.V_7_0_0.major + 1 : "legacy metadata loader is not needed anymore from v9 onwards"; - final Tuple legacyState = legacyLoader.loadClusterState(); - if (legacyState.v1().isEmpty() == false) { - return new OnDiskState(nodeEnvironment.nodeId(), null, legacyState.v1().getCurrentTerm(), - legacyState.v1().getClusterStateVersion(), legacyState.v2()); - } - } - return bestOnDiskState; } @@ -319,8 +278,9 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw final SetOnce builderReference = new SetOnce<>(); consumeFromType(searcher, GLOBAL_TYPE_NAME, bytes -> { - final MetaData metaData = MetaData.fromXContent(XContentFactory.xContent(XContentType.SMILE) - .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytes.bytes, bytes.offset, bytes.length)); + final MetaData metaData = MetaData.Builder.fromXContent(XContentFactory.xContent(XContentType.SMILE) + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytes.bytes, bytes.offset, bytes.length), + preserveUnknownCustoms); logger.trace("found global metadata with last-accepted term [{}]", metaData.coordinationMetaData().term()); if (builderReference.get() != null) { throw new IllegalStateException("duplicate global metadata found in [" + dataPath + "]"); @@ -472,96 +432,62 @@ public void close() throws IOException { } } - /** - * Encapsulates the incremental writing of metadata to a collection of {@link MetaDataIndexWriter}s. - */ - private static class LucenePersistedState implements CoordinationState.PersistedState, Closeable { + public static class Writer implements Closeable { - private long currentTerm; - private ClusterState lastAcceptedState; private final List metaDataIndexWriters; private final String nodeId; private final BigArrays bigArrays; - LucenePersistedState(String nodeId, List metaDataIndexWriters, long currentTerm, - ClusterState lastAcceptedState, BigArrays bigArrays) { - this.currentTerm = currentTerm; - this.lastAcceptedState = lastAcceptedState; + boolean fullStateWritten = false; + + private Writer(List metaDataIndexWriters, String nodeId, BigArrays bigArrays) { this.metaDataIndexWriters = metaDataIndexWriters; this.nodeId = nodeId; this.bigArrays = bigArrays; } - @Override - public long getCurrentTerm() { - return currentTerm; - } - - @Override - public ClusterState getLastAcceptedState() { - return lastAcceptedState; - } - - void persistInitialState() throws IOException { - // Write the whole state out to be sure it's fresh and using the latest format. Called during initialisation, so that - // (1) throwing an IOException is enough to halt the node, and - // (2) the index is currently empty since it was opened with IndexWriterConfig.OpenMode.CREATE - - // In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance, this is - // true if there's only one data path on this master node, and the commit we just loaded was already written out by this - // version of Elasticsearch. TODO TBD should we avoid indexing when possible? - addMetaData(lastAcceptedState); - commit(currentTerm, lastAcceptedState.getVersion()); - } - - @Override - public void setCurrentTerm(long currentTerm) { - commit(currentTerm, lastAcceptedState.version()); - this.currentTerm = currentTerm; + /** + * Overrides and commits the given current term and cluster state + */ + public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException { + overwriteMetaData(clusterState.metaData()); + commit(currentTerm, clusterState.version()); + fullStateWritten = true; } - @Override - public void setLastAcceptedState(ClusterState clusterState) { - try { - if (clusterState.term() != lastAcceptedState.term()) { - assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); - // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so - // it's simplest to write everything again. - overwriteMetaData(clusterState); - } else { - // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. - updateMetaData(clusterState); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - + /** + * Updates and commits the given cluster state update + */ + void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, + ClusterState clusterState) throws IOException { + assert fullStateWritten : "Need to write full state first before doing incremental writes"; + updateMetaData(previousClusterState.metaData(), clusterState.metaData()); commit(currentTerm, clusterState.version()); - lastAcceptedState = clusterState; } /** * Update the persisted metadata to match the given cluster state by removing any stale or unnecessary documents and adding any * updated documents. */ - private void updateMetaData(ClusterState clusterState) throws IOException { - assert lastAcceptedState.term() == clusterState.term(); - logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only", clusterState.term()); + private void updateMetaData(MetaData previouslyWrittenMetaData, MetaData metaData) throws IOException { + assert previouslyWrittenMetaData.coordinationMetaData().term() == metaData.coordinationMetaData().term(); + logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only", + metaData.coordinationMetaData().term()); - try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(clusterState)) { + try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(metaData)) { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument()); } } - final Map indexMetaDataVersionByUUID = new HashMap<>(lastAcceptedState.metaData().indices().size()); - for (ObjectCursor cursor : lastAcceptedState.metaData().indices().values()) { + final Map indexMetaDataVersionByUUID = new HashMap<>(previouslyWrittenMetaData.indices().size()); + for (ObjectCursor cursor : previouslyWrittenMetaData.indices().values()) { final IndexMetaData indexMetaData = cursor.value; final Long previousValue = indexMetaDataVersionByUUID.putIfAbsent(indexMetaData.getIndexUUID(), indexMetaData.getVersion()); assert previousValue == null : indexMetaData.getIndexUUID() + " already mapped to " + previousValue; } - for (ObjectCursor cursor : clusterState.metaData().indices().values()) { + for (ObjectCursor cursor : metaData.indices().values()) { final IndexMetaData indexMetaData = cursor.value; final Long previousVersion = indexMetaDataVersionByUUID.get(indexMetaData.getIndexUUID()); if (previousVersion == null || indexMetaData.getVersion() != previousVersion) { @@ -594,24 +520,24 @@ private void updateMetaData(ClusterState clusterState) throws IOException { /** * Update the persisted metadata to match the given cluster state by removing all existing documents and then adding new documents. */ - private void overwriteMetaData(ClusterState clusterState) throws IOException { + private void overwriteMetaData(MetaData metaData) throws IOException { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.deleteAll(); } - addMetaData(clusterState); + addMetaData(metaData); } /** * Add documents for the metadata of the given cluster state, assuming that there are currently no documents. */ - private void addMetaData(ClusterState clusterState) throws IOException { - try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(clusterState)) { + private void addMetaData(MetaData metaData) throws IOException { + try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(metaData)) { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument()); } } - for (ObjectCursor cursor : clusterState.metaData().indices().values()) { + for (ObjectCursor cursor : metaData.indices().values()) { final IndexMetaData indexMetaData = cursor.value; try (ReleasableDocument indexMetaDataDocument = makeIndexMetaDataDocument(indexMetaData)) { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { @@ -627,7 +553,7 @@ private void addMetaData(ClusterState clusterState) throws IOException { } } - private void commit(long currentTerm, long lastAcceptedVersion) { + public void commit(long currentTerm, long lastAcceptedVersion) { try { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.commit(nodeId, currentTerm, lastAcceptedVersion); @@ -662,8 +588,8 @@ private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData } } - private ReleasableDocument makeGlobalMetaDataDocument(ClusterState clusterState) throws IOException { - return makeDocument(GLOBAL_TYPE_NAME, clusterState.metaData()); + private ReleasableDocument makeGlobalMetaDataDocument(MetaData metaData) throws IOException { + return makeDocument(GLOBAL_TYPE_NAME, metaData); } private ReleasableDocument makeDocument(String typeName, ToXContent metaData) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java index b7a178972fa72..045c1197dc5a8 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -21,7 +21,6 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexWriter; @@ -33,9 +32,9 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cli.EnvironmentAwareCommand; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; @@ -49,7 +48,6 @@ import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetaData; @@ -60,24 +58,20 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TruncateTranslogAction; -import org.elasticsearch.indices.IndicesModule; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.io.PrintWriter; -import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.stream.StreamSupport; -public class RemoveCorruptedShardDataCommand extends EnvironmentAwareCommand { +public class RemoveCorruptedShardDataCommand extends ElasticsearchNodeCommand { private static final Logger logger = LogManager.getLogger(RemoveCorruptedShardDataCommand.class); @@ -88,7 +82,6 @@ public class RemoveCorruptedShardDataCommand extends EnvironmentAwareCommand { private final RemoveCorruptedLuceneSegmentsAction removeCorruptedLuceneSegmentsAction; private final TruncateTranslogAction truncateTranslogAction; - private final NamedXContentRegistry namedXContentRegistry; public RemoveCorruptedShardDataCommand() { super("Removes corrupted shard files"); @@ -106,11 +99,6 @@ public RemoveCorruptedShardDataCommand() { parser.accepts(TRUNCATE_CLEAN_TRANSLOG_FLAG, "Truncate the translog even if it is not corrupt"); - namedXContentRegistry = new NamedXContentRegistry( - Stream.of(ClusterModule.getNamedXWriteables().stream(), IndicesModule.getNamedXContents().stream()) - .flatMap(Function.identity()) - .collect(Collectors.toList())); - removeCorruptedLuceneSegmentsAction = new RemoveCorruptedLuceneSegmentsAction(); truncateTranslogAction = new TruncateTranslogAction(namedXContentRegistry); } @@ -130,11 +118,12 @@ protected Path getPath(String dirValue) { return PathUtils.get(dirValue, "", ""); } - protected void findAndProcessShardPath(OptionSet options, Environment environment, CheckedConsumer consumer) + protected void findAndProcessShardPath(OptionSet options, Environment environment, Path[] dataPaths, ClusterState clusterState, + CheckedConsumer consumer) throws IOException { final Settings settings = environment.settings(); - final String indexName; + final IndexMetaData indexMetaData; final int shardId; if (options.has(folderOption)) { @@ -146,65 +135,48 @@ protected void findAndProcessShardPath(OptionSet options, Environment environmen throw new ElasticsearchException("index directory [" + indexPath + "], must exist and be a directory"); } - final IndexMetaData indexMetaData = - IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardParent); - final String shardIdFileName = path.getFileName().toString(); + final String indexUUIDFolderName = shardParent.getFileName().toString(); if (Files.isDirectory(path) && shardIdFileName.chars().allMatch(Character::isDigit) // SHARD-ID path element check && NodeEnvironment.INDICES_FOLDER.equals(shardParentParent.getFileName().toString()) // `indices` check ) { shardId = Integer.parseInt(shardIdFileName); - indexName = indexMetaData.getIndex().getName(); + indexMetaData = StreamSupport.stream(clusterState.metaData().indices().values().spliterator(), false) + .map(imd -> imd.value) + .filter(imd -> imd.getIndexUUID().equals(indexUUIDFolderName)).findFirst() + .orElse(null); } else { throw new ElasticsearchException("Unable to resolve shard id. Wrong folder structure at [ " + path.toString() + " ], expected .../indices/[INDEX-UUID]/[SHARD-ID]"); } } else { // otherwise resolve shardPath based on the index name and shard id - indexName = Objects.requireNonNull(indexNameOption.value(options), "Index name is required"); + String indexName = Objects.requireNonNull(indexNameOption.value(options), "Index name is required"); shardId = Objects.requireNonNull(shardIdOption.value(options), "Shard ID is required"); + indexMetaData = clusterState.metaData().index(indexName); } - try (NodeEnvironment.NodeLock nodeLock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) { - final NodeEnvironment.NodePath[] nodePaths = nodeLock.getNodePaths(); - for (NodeEnvironment.NodePath nodePath : nodePaths) { - if (Files.exists(nodePath.indicesPath)) { - // have to scan all index uuid folders to resolve from index name - try (DirectoryStream stream = Files.newDirectoryStream(nodePath.indicesPath)) { - for (Path file : stream) { - if (Files.exists(file.resolve(MetaDataStateFormat.STATE_DIR_NAME)) == false) { - continue; - } - - final IndexMetaData indexMetaData = - IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, file); - if (indexMetaData == null) { - continue; - } - final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); - final Index index = indexMetaData.getIndex(); - if (indexName.equals(index.getName()) == false) { - continue; - } - final ShardId shId = new ShardId(index, shardId); - - final Path shardPathLocation = nodePath.resolve(shId); - if (Files.exists(shardPathLocation) == false) { - continue; - } - final ShardPath shardPath = ShardPath.loadShardPath(logger, shId, indexSettings.customDataPath(), - new Path[]{shardPathLocation}, nodePath.path); - if (shardPath != null) { - consumer.accept(shardPath); - return; - } - } - } + if (indexMetaData == null) { + throw new ElasticsearchException("Unable to find index in cluster state"); + } + + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + final Index index = indexMetaData.getIndex(); + final ShardId shId = new ShardId(index, shardId); + + for (Path dataPath : dataPaths) { + final Path shardPathLocation = dataPath + .resolve(NodeEnvironment.INDICES_FOLDER) + .resolve(index.getUUID()) + .resolve(Integer.toString(shId.id())); + if (Files.exists(shardPathLocation)) { + final ShardPath shardPath = ShardPath.loadShardPath(logger, shId, indexSettings.customDataPath(), + new Path[]{shardPathLocation}, dataPath); + if (shardPath != null) { + consumer.accept(shardPath); + return; } } - } catch (LockObtainFailedException lofe) { - throw new ElasticsearchException("Failed to lock node's directory [" + lofe.getMessage() - + "], is Elasticsearch still running ?"); } } @@ -256,11 +228,9 @@ private static void confirm(String msg, Terminal terminal) { } } - private void warnAboutESShouldBeStopped(Terminal terminal) { + private void warnAboutIndexBackup(Terminal terminal) { terminal.println("-----------------------------------------------------------------------"); terminal.println(""); - terminal.println(" WARNING: Elasticsearch MUST be stopped before running this tool."); - terminal.println(""); terminal.println(" Please make a complete backup of your index before using this tool."); terminal.println(""); terminal.println("-----------------------------------------------------------------------"); @@ -268,10 +238,12 @@ private void warnAboutESShouldBeStopped(Terminal terminal) { // Visible for testing @Override - public void execute(Terminal terminal, OptionSet options, Environment environment) throws Exception { - warnAboutESShouldBeStopped(terminal); + public void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment environment) throws IOException { + warnAboutIndexBackup(terminal); - findAndProcessShardPath(options, environment, shardPath -> { + final ClusterState clusterState = loadTermAndClusterState(createPersistedClusterStateService(dataPaths), environment).v2(); + + findAndProcessShardPath(options, environment, dataPaths, clusterState, shardPath -> { final Path indexPath = shardPath.resolveIndex(); final Path translogPath = shardPath.resolveTranslog(); if (Files.exists(translogPath) == false || Files.isDirectory(translogPath) == false) { @@ -320,7 +292,7 @@ public void write(int b) { terminal.println("Opening translog at " + translogPath); terminal.println(""); try { - translogCleanStatus = truncateTranslogAction.getCleanStatus(shardPath, indexDir); + translogCleanStatus = truncateTranslogAction.getCleanStatus(shardPath, clusterState, indexDir); } catch (Exception e) { terminal.println(e.getMessage()); throw e; @@ -464,11 +436,8 @@ private void newAllocationId(ShardPath shardPath, Terminal terminal) throws IOEx printRerouteCommand(shardPath, terminal, true); } - private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean allocateStale) throws IOException { - final IndexMetaData indexMetaData = - IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, - shardPath.getDataPath().getParent()); - + private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean allocateStale) + throws IOException { final Path nodePath = getNodePath(shardPath); final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodePath); @@ -478,7 +447,7 @@ private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean } final String nodeId = nodeMetaData.nodeId(); - final String index = indexMetaData.getIndex().getName(); + final String index = shardPath.getShardId().getIndexName(); final int id = shardPath.getShardId().id(); final AllocationCommands commands = new AllocationCommands( allocateStale diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index e6581d0359d11..9480ee3c1e1f3 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -26,6 +26,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cli.Terminal; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; @@ -63,6 +64,7 @@ public TruncateTranslogAction(NamedXContentRegistry namedXContentRegistry) { } public Tuple getCleanStatus(ShardPath shardPath, + ClusterState clusterState, Directory indexDirectory) throws IOException { final Path indexPath = shardPath.resolveIndex(); final Path translogPath = shardPath.resolveTranslog(); @@ -83,7 +85,7 @@ public Tuple getCleanStatus throw new ElasticsearchException("shard must have a valid translog UUID but got: [null]"); } - final boolean clean = isTranslogClean(shardPath, translogUUID); + final boolean clean = isTranslogClean(shardPath, clusterState, translogUUID); if (clean) { return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN, null); @@ -166,13 +168,12 @@ public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirec IOUtils.fsync(translogPath, true); } - private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException { + private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState, String translogUUID) throws IOException { // perform clean check of translog instead of corrupted marker file try { final Path translogPath = shardPath.resolveTranslog(); final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); - final IndexMetaData indexMetaData = - IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardPath.getDataPath().getParent()); + final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(shardPath.getShardId().getIndex()); final IndexSettings indexSettings = new IndexSettings(indexMetaData, Settings.EMPTY); final TranslogConfig translogConfig = new TranslogConfig(shardPath.getShardId(), translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index d0e8c5efc5572..223be1b45d571 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -48,7 +48,6 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; @@ -61,7 +60,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; @@ -96,8 +94,7 @@ import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.gateway.LucenePersistedStateFactory; -import org.elasticsearch.gateway.LucenePersistedStateFactory.LegacyLoader; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.IndexSettings; @@ -410,18 +407,8 @@ protected Node( ClusterModule.getNamedXWriteables().stream()) .flatMap(Function.identity()).collect(toList())); final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); - final LucenePersistedStateFactory lucenePersistedStateFactory - = new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry, bigArrays, new LegacyLoader() { - @Override - public Tuple loadClusterState() throws IOException { - return metaStateService.loadFullState(); - } - - @Override - public void clean() throws IOException { - metaStateService.deleteAll(); - } - }); + final PersistedClusterStateService lucenePersistedStateFactory + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays); // collect engine factory providers from server and from plugins final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); @@ -559,7 +546,7 @@ public void clean() throws IOException { b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); - b.bind(LucenePersistedStateFactory.class).toInstance(lucenePersistedStateFactory); + b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory); b.bind(IndicesService.class).toInstance(indicesService); b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetaDataCreateIndexService.class).toInstance(metaDataCreateIndexService); @@ -705,9 +692,9 @@ public Node start() throws NodeValidationException { // Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); - gatewayMetaState.start(settings(), transportService, clusterService, + gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class), injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class), - injector.getInstance(LucenePersistedStateFactory.class)); + injector.getInstance(PersistedClusterStateService.class)); if (Assertions.ENABLED) { try { assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 5e1bc3b9473d0..e4560d0613ccd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -986,7 +986,6 @@ public void testClusterCannotFormWithFailingJoinValidation() { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // TODO implement cluster detaching public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessException { try (Cluster cluster1 = new Cluster(randomIntBetween(1, 3))) { cluster1.runRandomly(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java index b772e290a6ead..a487c2301c458 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java @@ -19,21 +19,18 @@ package org.elasticsearch.cluster.coordination; import joptsimple.OptionSet; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.cli.MockTerminal; -import org.elasticsearch.cli.Terminal; -import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -43,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.stream.Stream; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -50,9 +48,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // TODO unsafe bootstrapping and cluster detach @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase { @@ -147,37 +143,17 @@ public void testBootstrapNoNodeMetaData() throws IOException { NodeMetaData.FORMAT.cleanupOldFiles(-1, nodeEnvironment.nodeDataPaths()); } - expectThrows(() -> unsafeBootstrap(environment), UnsafeBootstrapMasterCommand.NO_NODE_METADATA_FOUND_MSG); + expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG); } public void testBootstrapNotBootstrappedCluster() throws Exception { String node = internalCluster().startNode( - Settings.builder() - .put(Node.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") // to ensure quick node startup - .build()); - assertBusy(() -> { - ClusterState state = client().admin().cluster().prepareState().setLocal(true) - .execute().actionGet().getState(); - assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID)); - }); - - Settings dataPathSettings = internalCluster().dataPathSettings(node); - - internalCluster().stopRandomDataNode(); - - Environment environment = TestEnvironment.newEnvironment( - Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); - expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.GLOBAL_GENERATION_MISSING_MSG); - } - - public void testDetachNotBootstrappedCluster() throws Exception { - String node = internalCluster().startNode( - Settings.builder() - .put(Node.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") // to ensure quick node startup - .build()); + Settings.builder() + .put(Node.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") // to ensure quick node startup + .build()); assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().setLocal(true) - .execute().actionGet().getState(); + .execute().actionGet().getState(); assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID)); }); @@ -187,24 +163,10 @@ public void testDetachNotBootstrappedCluster() throws Exception { Environment environment = TestEnvironment.newEnvironment( Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); - expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.GLOBAL_GENERATION_MISSING_MSG); - } - - public void testBootstrapNoManifestFile() throws IOException { - internalCluster().setBootstrapMasterNodeIndex(0); - String node = internalCluster().startNode(); - Settings dataPathSettings = internalCluster().dataPathSettings(node); - ensureStableCluster(1); - NodeEnvironment nodeEnvironment = internalCluster().getMasterNodeInstance(NodeEnvironment.class); - internalCluster().stopRandomDataNode(); - Environment environment = TestEnvironment.newEnvironment( - Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); - Manifest.FORMAT.cleanupOldFiles(-1, nodeEnvironment.nodeDataPaths()); - - expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_MANIFEST_FILE_FOUND_MSG); + expectThrows(() -> unsafeBootstrap(environment), UnsafeBootstrapMasterCommand.EMPTY_LAST_COMMITTED_VOTING_CONFIG_MSG); } - public void testDetachNoManifestFile() throws IOException { + public void testBootstrapNoClusterState() throws IOException { internalCluster().setBootstrapMasterNodeIndex(0); String node = internalCluster().startNode(); Settings dataPathSettings = internalCluster().dataPathSettings(node); @@ -213,39 +175,27 @@ public void testDetachNoManifestFile() throws IOException { internalCluster().stopRandomDataNode(); Environment environment = TestEnvironment.newEnvironment( Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); - Manifest.FORMAT.cleanupOldFiles(-1, nodeEnvironment.nodeDataPaths()); + IOUtils.rm(Stream.of(nodeEnvironment.nodeDataPaths()) + .map(path -> path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)) + .toArray(Path[]::new)); - expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.NO_MANIFEST_FILE_FOUND_MSG); + expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.CS_MISSING_MSG); } - public void testBootstrapNoMetaData() throws IOException { + public void testDetachNoClusterState() throws IOException { internalCluster().setBootstrapMasterNodeIndex(0); String node = internalCluster().startNode(); Settings dataPathSettings = internalCluster().dataPathSettings(node); ensureStableCluster(1); NodeEnvironment nodeEnvironment = internalCluster().getMasterNodeInstance(NodeEnvironment.class); internalCluster().stopRandomDataNode(); - Environment environment = TestEnvironment.newEnvironment( Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); - MetaData.FORMAT.cleanupOldFiles(-1, nodeEnvironment.nodeDataPaths()); + IOUtils.rm(Stream.of(nodeEnvironment.nodeDataPaths()) + .map(path -> path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)) + .toArray(Path[]::new)); - expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_GLOBAL_METADATA_MSG); - } - - public void testDetachNoMetaData() throws IOException { - internalCluster().setBootstrapMasterNodeIndex(0); - String node = internalCluster().startNode(); - Settings dataPathSettings = internalCluster().dataPathSettings(node); - ensureStableCluster(1); - NodeEnvironment nodeEnvironment = internalCluster().getMasterNodeInstance(NodeEnvironment.class); - internalCluster().stopRandomDataNode(); - - Environment environment = TestEnvironment.newEnvironment( - Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); - MetaData.FORMAT.cleanupOldFiles(-1, nodeEnvironment.nodeDataPaths()); - - expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.NO_GLOBAL_METADATA_MSG); + expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.CS_MISSING_MSG); } public void testBootstrapAbortedByUser() throws IOException { @@ -324,10 +274,11 @@ public void test3MasterNodes2Failed() throws Exception { logger.info("--> unsafely-bootstrap 1st master-eligible node"); MockTerminal terminal = unsafeBootstrap(environmentMaster1); - MetaData metaData = MetaData.FORMAT.loadLatestState(logger, xContentRegistry(), nodeEnvironment.nodeDataPaths()); + MetaData metaData = ElasticsearchNodeCommand.createPersistedClusterStateService(nodeEnvironment.nodeDataPaths()) + .loadBestOnDiskState().metaData; assertThat(terminal.getOutput(), containsString( - String.format(Locale.ROOT, UnsafeBootstrapMasterCommand.CLUSTER_STATE_TERM_VERSION_MSG_FORMAT, - metaData.coordinationMetaData().term(), metaData.version()))); + String.format(Locale.ROOT, UnsafeBootstrapMasterCommand.CLUSTER_STATE_TERM_VERSION_MSG_FORMAT, + metaData.coordinationMetaData().term(), metaData.version()))); logger.info("--> start 1st master-eligible node"); internalCluster().startMasterOnlyNode(master1DataPathSettings); @@ -402,14 +353,15 @@ public boolean clearData(String nodeName) { internalCluster().startDataOnlyNode(dataNodeDataPathSettings); ensureStableCluster(2); - logger.info("--> verify that the dangling index exists and has green status"); - assertBusy(() -> { - assertThat(indexExists("test"), equalTo(true)); - }); - ensureGreen("test"); - - logger.info("--> verify the doc is there"); - assertThat(client().prepareGet("test", "1").execute().actionGet().isExists(), equalTo(true)); + // TODO: @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // dangling indices +// logger.info("--> verify that the dangling index exists and has green status"); +// assertBusy(() -> { +// assertThat(indexExists("test"), equalTo(true)); +// }); +// ensureGreen("test"); +// +// logger.info("--> verify the doc is there"); +// assertThat(client().prepareGet("test", "1").execute().actionGet().isExists(), equalTo(true)); } public void testNoInitialBootstrapAfterDetach() throws Exception { @@ -461,65 +413,4 @@ public void testCanRunUnsafeBootstrapAfterErroneousDetachWithoutLoosingMetaData( assertThat(state.metaData().settings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()), equalTo("1234kb")); } - - private static class SimulatedDeleteFailureException extends RuntimeException { - } - - public void testCleanupOldMetaDataFails() throws Exception { - // establish some metadata. - internalCluster().setBootstrapMasterNodeIndex(0); - String node = internalCluster().startNode(); - Settings dataPathSettings = internalCluster().dataPathSettings(node); - final Environment environment = TestEnvironment.newEnvironment( - Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); - internalCluster().stopRandomDataNode(); - - // find data paths - Path[] dataPaths; - try (NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment)) { - dataPaths = nodeEnvironment.nodeDataPaths(); - } - - NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(ClusterModule.getNamedXWriteables()); - - final Manifest originalManifest = loadLatestManifest(dataPaths, namedXContentRegistry); - final MetaData originalMetaData = loadMetaData(dataPaths, namedXContentRegistry, originalManifest); - - executeCommand(new UnsafeBootstrapMasterCommand() { - @Override - protected void cleanUpOldMetaData(Terminal terminal, Path[] dataPaths, long newGeneration) { - throw new SimulatedDeleteFailureException(); - } - }, environment, false); - - - // check original meta-data left untouched. - assertEquals(loadMetaData(dataPaths, namedXContentRegistry, originalManifest).clusterUUID(), originalMetaData.clusterUUID()); - - // check that we got new clusterUUID despite deletion failing - final Manifest secondManifest = loadLatestManifest(dataPaths, namedXContentRegistry); - final MetaData secondMetaData = loadMetaData(dataPaths, namedXContentRegistry, secondManifest); - assertThat(secondManifest.getGlobalGeneration(), greaterThan(originalManifest.getGlobalGeneration())); - assertNotEquals(originalMetaData.clusterUUID(), secondMetaData.clusterUUID()); - - // check that a new run will cleanup. - executeCommand(new UnsafeBootstrapMasterCommand(), environment, false); - - assertNull(loadMetaData(dataPaths, namedXContentRegistry, originalManifest)); - assertNull(loadMetaData(dataPaths, namedXContentRegistry, secondManifest)); - - final Manifest finalManifest = loadLatestManifest(dataPaths, namedXContentRegistry); - final MetaData finalMetaData = loadMetaData(dataPaths, namedXContentRegistry, finalManifest); - - assertNotNull(finalMetaData); - assertNotEquals(secondMetaData.clusterUUID(), finalMetaData.clusterUUID()); - } - - private Manifest loadLatestManifest(Path[] dataPaths, NamedXContentRegistry namedXContentRegistry) throws IOException { - return Manifest.FORMAT.loadLatestState(logger, namedXContentRegistry, dataPaths); - } - - private MetaData loadMetaData(Path[] dataPaths, NamedXContentRegistry namedXContentRegistry, Manifest manifest) { - return MetaData.FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(), dataPaths); - } } diff --git a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java index 1d864201c1d76..003957f3199e9 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java +++ b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java @@ -27,13 +27,10 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.not; -import static org.mockito.Matchers.contains; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class NodeRepurposeCommandIT extends ESIntegTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // TODO node repurposing public void testRepurpose() throws Exception { final String indexName = "test-repurpose"; @@ -46,7 +43,6 @@ public void testRepurpose() throws Exception { .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) ).get(); - final String indexUUID = resolveIndex(indexName).getUUID(); logger.info("--> indexing a simple document"); client().prepareIndex(indexName).setId("1").setSource("field1", "value1").get(); @@ -83,10 +79,10 @@ public void testRepurpose() throws Exception { ); logger.info("--> Repurposing node 1"); - executeRepurposeCommand(noMasterNoDataSettingsForDataNode, indexUUID, 1); + executeRepurposeCommand(noMasterNoDataSettingsForDataNode, 1, 1); ElasticsearchException lockedException = expectThrows(ElasticsearchException.class, - () -> executeRepurposeCommand(noMasterNoDataSettingsForMasterNode, indexUUID, 1) + () -> executeRepurposeCommand(noMasterNoDataSettingsForMasterNode, 1, 1) ); assertThat(lockedException.getMessage(), containsString(NodeRepurposeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG)); @@ -102,7 +98,7 @@ public void testRepurpose() throws Exception { internalCluster().stopRandomNode(s -> true); internalCluster().stopRandomNode(s -> true); - executeRepurposeCommand(noMasterNoDataSettingsForMasterNode, indexUUID, 0); + executeRepurposeCommand(noMasterNoDataSettingsForMasterNode, 1, 0); // by restarting as master and data node, we can check that the index definition was really deleted and also that the tool // does not mess things up so much that the nodes cannot boot as master or data node any longer. @@ -115,14 +111,13 @@ public void testRepurpose() throws Exception { assertFalse(indexExists(indexName)); } - private void executeRepurposeCommand(Settings settings, String indexUUID, int expectedShardCount) throws Exception { + private void executeRepurposeCommand(Settings settings, int expectedIndexCount, + int expectedShardCount) throws Exception { boolean verbose = randomBoolean(); Settings settingsWithPath = Settings.builder().put(internalCluster().getDefaultSettings()).put(settings).build(); - int expectedIndexCount = TestEnvironment.newEnvironment(settingsWithPath).dataFiles().length; Matcher matcher = allOf( - containsString(NodeRepurposeCommand.noMasterMessage(1, expectedShardCount, expectedIndexCount)), - not(contains(NodeRepurposeCommand.PRE_V7_MESSAGE)), - NodeRepurposeCommandTests.conditionalNot(containsString(indexUUID), verbose == false)); + containsString(NodeRepurposeCommand.noMasterMessage(expectedIndexCount, expectedShardCount, 0)), + NodeRepurposeCommandTests.conditionalNot(containsString("test-repurpose"), verbose == false)); NodeRepurposeCommandTests.verifySuccess(settingsWithPath, matcher, verbose); } diff --git a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java index 8f713e57bf4da..d54fb051cd606 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java @@ -23,13 +23,15 @@ import org.elasticsearch.Version; import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.Manifest; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.index.Index; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; @@ -40,16 +42,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; -import java.util.Collections; import java.util.stream.Stream; import static org.elasticsearch.env.NodeRepurposeCommand.NO_CLEANUP; import static org.elasticsearch.env.NodeRepurposeCommand.NO_DATA_TO_CLEAN_UP_FOUND; import static org.elasticsearch.env.NodeRepurposeCommand.NO_SHARD_DATA_TO_CLEAN_UP_FOUND; -import static org.elasticsearch.env.NodeRepurposeCommand.PRE_V7_MESSAGE; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; public class NodeRepurposeCommandTests extends ESTestCase { @@ -86,27 +85,32 @@ public void createNodePaths() throws IOException { } public void testEarlyExitNoCleanup() throws Exception { - createIndexDataFiles(dataMasterSettings, randomInt(10)); + createIndexDataFiles(dataMasterSettings, randomInt(10), randomBoolean()); verifyNoQuestions(dataMasterSettings, containsString(NO_CLEANUP)); verifyNoQuestions(dataNoMasterSettings, containsString(NO_CLEANUP)); } public void testNothingToCleanup() throws Exception { - verifyNoQuestions(noDataNoMasterSettings, allOf(containsString(NO_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE)))); - verifyNoQuestions(noDataMasterSettings, - allOf(containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE)))); - - createManifest(null); + verifyNoQuestions(noDataNoMasterSettings, containsString(NO_DATA_TO_CLEAN_UP_FOUND)); + verifyNoQuestions(noDataMasterSettings, containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND)); + + Environment environment = TestEnvironment.newEnvironment(noDataMasterSettings); + if (randomBoolean()) { + try (NodeEnvironment env = new NodeEnvironment(noDataMasterSettings, environment)) { + try (PersistedClusterStateService.Writer writer = + ElasticsearchNodeCommand.createPersistedClusterStateService(env.nodeDataPaths()).createWriter()) { + writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE); + } + } + } - verifyNoQuestions(noDataNoMasterSettings, allOf(containsString(NO_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE)))); - verifyNoQuestions(noDataMasterSettings, - allOf(containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE)))); + verifyNoQuestions(noDataNoMasterSettings, containsString(NO_DATA_TO_CLEAN_UP_FOUND)); + verifyNoQuestions(noDataMasterSettings, containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND)); - createIndexDataFiles(dataMasterSettings, 0); + createIndexDataFiles(dataMasterSettings, 0, randomBoolean()); - verifyNoQuestions(noDataMasterSettings, - allOf(containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE)))); + verifyNoQuestions(noDataMasterSettings, containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND)); } @@ -119,33 +123,20 @@ public void testLocked() throws IOException { } public void testCleanupAll() throws Exception { - Manifest oldManifest = createManifest(INDEX); - checkCleanupAll(not(containsString(PRE_V7_MESSAGE))); - - Manifest newManifest = loadManifest(); - assertThat(newManifest.getIndexGenerations().entrySet(), hasSize(0)); - assertManifestIdenticalExceptIndices(oldManifest, newManifest); - } - - public void testCleanupAllPreV7() throws Exception { - checkCleanupAll(containsString(PRE_V7_MESSAGE)); - } - - private void checkCleanupAll(Matcher additionalOutputMatcher) throws Exception { - int shardCount = randomInt(10); + int shardCount = randomIntBetween(1, 10); boolean verbose = randomBoolean(); - createIndexDataFiles(dataMasterSettings, shardCount); + boolean hasClusterState = randomBoolean(); + createIndexDataFiles(dataMasterSettings, shardCount, hasClusterState); String messageText = NodeRepurposeCommand.noMasterMessage( 1, environment.dataFiles().length*shardCount, - environment.dataFiles().length); + 0); Matcher outputMatcher = allOf( containsString(messageText), - additionalOutputMatcher, - conditionalNot(containsString("testUUID"), verbose == false), - conditionalNot(containsString("testIndex"), verbose == false) + conditionalNot(containsString("testIndex"), verbose == false || hasClusterState == false), + conditionalNot(containsString("no name for uuid: testUUID"), verbose == false || hasClusterState) ); verifyUnchangedOnAbort(noDataNoMasterSettings, outputMatcher, verbose); @@ -162,18 +153,17 @@ private void checkCleanupAll(Matcher additionalOutputMatcher) throws Exc public void testCleanupShardData() throws Exception { int shardCount = randomIntBetween(1, 10); boolean verbose = randomBoolean(); - Manifest manifest = randomBoolean() ? createManifest(INDEX) : null; - - createIndexDataFiles(dataMasterSettings, shardCount); + boolean hasClusterState = randomBoolean(); + createIndexDataFiles(dataMasterSettings, shardCount, hasClusterState); Matcher matcher = allOf( containsString(NodeRepurposeCommand.shardMessage(environment.dataFiles().length * shardCount, 1)), conditionalNot(containsString("testUUID"), verbose == false), - conditionalNot(containsString("testIndex"), verbose == false) + conditionalNot(containsString("testIndex"), verbose == false || hasClusterState == false), + conditionalNot(containsString("no name for uuid: testUUID"), verbose == false || hasClusterState) ); - verifyUnchangedOnAbort(noDataMasterSettings, - matcher, verbose); + verifyUnchangedOnAbort(noDataMasterSettings, matcher, verbose); // verify test setup expectThrows(IllegalStateException.class, () -> new NodeEnvironment(noDataMasterSettings, environment).close()); @@ -182,12 +172,6 @@ public void testCleanupShardData() throws Exception { //verify clean. new NodeEnvironment(noDataMasterSettings, environment).close(); - - if (manifest != null) { - Manifest newManifest = loadManifest(); - assertThat(newManifest.getIndexGenerations().entrySet(), hasSize(1)); - assertManifestIdenticalExceptIndices(manifest, newManifest); - } } static void verifySuccess(Settings settings, Matcher outputMatcher, boolean verbose) throws Exception { @@ -237,31 +221,22 @@ private static void executeRepurposeCommand(MockTerminal terminal, Settings sett nodeRepurposeCommand.testExecute(terminal, options, env); } - private Manifest createManifest(Index index) throws org.elasticsearch.gateway.WriteStateException { - Manifest manifest = new Manifest(randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100), - index != null ? Collections.singletonMap(index, randomLongBetween(1,100)) : Collections.emptyMap()); - Manifest.FORMAT.writeAndCleanup(manifest, nodePaths); - return manifest; - } - - private Manifest loadManifest() throws IOException { - return Manifest.FORMAT.loadLatestState(logger, new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), nodePaths); - } - - private void assertManifestIdenticalExceptIndices(Manifest oldManifest, Manifest newManifest) { - assertEquals(oldManifest.getGlobalGeneration(), newManifest.getGlobalGeneration()); - assertEquals(oldManifest.getClusterStateVersion(), newManifest.getClusterStateVersion()); - assertEquals(oldManifest.getCurrentTerm(), newManifest.getCurrentTerm()); - } - - private void createIndexDataFiles(Settings settings, int shardCount) throws IOException { + private void createIndexDataFiles(Settings settings, int shardCount, boolean writeClusterState) throws IOException { int shardDataDirNumber = randomInt(10); - try (NodeEnvironment env = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { - IndexMetaData.FORMAT.write(IndexMetaData.builder(INDEX.getName()) - .settings(Settings.builder().put("index.version.created", Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(1) - .build(), env.indexPaths(INDEX)); + Environment environment = TestEnvironment.newEnvironment(settings); + try (NodeEnvironment env = new NodeEnvironment(settings, environment)) { + if (writeClusterState) { + try (PersistedClusterStateService.Writer writer = + ElasticsearchNodeCommand.createPersistedClusterStateService(env.nodeDataPaths()).createWriter()) { + writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder().put(IndexMetaData.builder(INDEX.getName()) + .settings(Settings.builder().put("index.version.created", Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, INDEX.getUUID())) + .numberOfShards(1) + .numberOfReplicas(1)).build()) + .build()); + } + } for (Path path : env.indexPaths(INDEX)) { for (int i = 0; i < shardCount; ++i) { Files.createDirectories(path.resolve(Integer.toString(shardDataDirNumber))); diff --git a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java index e7eb4c53dfec9..08256620e5d57 100644 --- a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.env; +import joptsimple.OptionParser; +import joptsimple.OptionSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -43,6 +45,7 @@ public class OverrideNodeVersionCommandTests extends ESTestCase { private Environment environment; private Path[] nodePaths; + private final OptionSet noOptions = new OptionParser().parse(); @Before public void createNodePaths() throws IOException { @@ -57,7 +60,7 @@ public void testFailsOnEmptyPath() { final Path emptyPath = createTempDir(); final MockTerminal mockTerminal = new MockTerminal(); final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, () -> - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, new Path[]{emptyPath}, environment)); + new OverrideNodeVersionCommand().processNodePaths(mockTerminal, new Path[]{emptyPath}, noOptions, environment)); assertThat(elasticsearchException.getMessage(), equalTo(OverrideNodeVersionCommand.NO_METADATA_MESSAGE)); expectThrows(IllegalStateException.class, () -> mockTerminal.readText("")); } @@ -67,7 +70,7 @@ public void testFailsIfUnnecessary() throws WriteStateException { NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(randomAlphaOfLength(10), nodeVersion), nodePaths); final MockTerminal mockTerminal = new MockTerminal(); final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, () -> - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, environment)); + new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment)); assertThat(elasticsearchException.getMessage(), allOf( containsString("compatible with current version"), containsString(Version.CURRENT.toString()), @@ -82,7 +85,7 @@ public void testWarnsIfTooOld() throws Exception { final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput("n\n"); final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, () -> - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, environment)); + new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment)); assertThat(elasticsearchException.getMessage(), equalTo("aborted by user")); assertThat(mockTerminal.getOutput(), allOf( containsString("too old"), @@ -104,7 +107,7 @@ public void testWarnsIfTooNew() throws Exception { final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput(randomFrom("yy", "Yy", "n", "yes", "true", "N", "no")); final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, () -> - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, environment)); + new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment)); assertThat(elasticsearchException.getMessage(), equalTo("aborted by user")); assertThat(mockTerminal.getOutput(), allOf( containsString("data loss"), @@ -124,7 +127,7 @@ public void testOverwritesIfTooOld() throws Exception { NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, nodeVersion), nodePaths); final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput(randomFrom("y", "Y")); - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, environment); + new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment); assertThat(mockTerminal.getOutput(), allOf( containsString("too old"), containsString("data loss"), @@ -145,7 +148,7 @@ public void testOverwritesIfTooNew() throws Exception { NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, nodeVersion), nodePaths); final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput(randomFrom("y", "Y")); - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, environment); + new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment); assertThat(mockTerminal.getOutput(), allOf( containsString("data loss"), containsString("You should not use this tool"), @@ -172,7 +175,7 @@ public void testLenientlyIgnoresExtraFields() throws Exception { final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput(randomFrom("y", "Y")); - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, environment); + new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment); assertThat(mockTerminal.getOutput(), allOf( containsString("data loss"), containsString("You should not use this tool"), diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index 5622ddf47fb81..da8a04ce10c8a 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -29,9 +29,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -550,17 +548,16 @@ public void testHalfDeletedIndexImport() throws Exception { } private void restartNodesOnBrokenClusterState(ClusterState.Builder clusterStateBuilder) throws Exception { - Map lucenePersistedStateFactories = Stream.of(internalCluster().getNodeNames()) + Map lucenePersistedStateFactories = Stream.of(internalCluster().getNodeNames()) .collect(Collectors.toMap(Function.identity(), - nodeName -> internalCluster().getInstance(LucenePersistedStateFactory.class, nodeName))); + nodeName -> internalCluster().getInstance(PersistedClusterStateService.class, nodeName))); final ClusterState clusterState = clusterStateBuilder.build(); internalCluster().fullRestart(new RestartCallback(){ @Override public Settings onNodeStopped(String nodeName) throws Exception { - final LucenePersistedStateFactory lucenePersistedStateFactory = lucenePersistedStateFactories.get(nodeName); - try (CoordinationState.PersistedState persistedState = lucenePersistedStateFactory.loadPersistedState( - (v, m) -> ClusterState.builder(ClusterName.DEFAULT).version(v).metaData(m).build())) { - persistedState.setLastAcceptedState(clusterState); + final PersistedClusterStateService lucenePersistedStateFactory = lucenePersistedStateFactories.get(nodeName); + try (PersistedClusterStateService.Writer writer = lucenePersistedStateFactory.createWriter()) { + writer.writeFullStateAndCommit(clusterState.term(), clusterState); } return super.onNodeStopped(nodeName); } diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index ddac6bb98a5be..868836bb30f61 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.coordination.CoordinationMetaData; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; import org.elasticsearch.cluster.coordination.CoordinationState; -import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; @@ -33,12 +32,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.nio.file.Path; import java.util.Collections; import static org.hamcrest.Matchers.equalTo; @@ -71,7 +74,7 @@ private CoordinationState.PersistedState newGatewayPersistedState() { final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); gateway.start(settings, nodeEnvironment, xContentRegistry()); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); - assertThat(persistedState, not(instanceOf(InMemoryPersistedState.class))); + assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class)); return persistedState; } @@ -274,4 +277,36 @@ public void testMarkAcceptedConfigAsCommitted() throws IOException { } } + public void testStatePersistedOnLoad() throws IOException { + // open LucenePersistedState to make sure that cluster state is written out to each data path + final PersistedClusterStateService persistedClusterStateService = + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + final ClusterState state = createClusterState(randomNonNegativeLong(), + MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); + try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState( + persistedClusterStateService.createWriter(), 42L, state)) { + + } + + nodeEnvironment.close(); + + // verify that the freshest state was rewritten to each data path + for (Path path : nodeEnvironment.nodeDataPaths()) { + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_DATA_SETTING.getKey(), path.toString()).build(); + try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { + final PersistedClusterStateService newPersistedClusterStateService = + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); + assertFalse(onDiskState.empty()); + assertThat(onDiskState.currentTerm, equalTo(42L)); + assertClusterStateEqual(state, + ClusterState.builder(ClusterName.DEFAULT) + .version(onDiskState.lastAcceptedVersion) + .metaData(onDiskState.metaData).build()); + } + } + } + } diff --git a/server/src/test/java/org/elasticsearch/gateway/LucenePersistedStateFactoryTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java similarity index 60% rename from server/src/test/java/org/elasticsearch/gateway/LucenePersistedStateFactoryTests.java rename to server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 677c552143af2..92541b20efe99 100644 --- a/server/src/test/java/org/elasticsearch/gateway/LucenePersistedStateFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationMetaData; -import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.UUIDs; @@ -41,13 +40,13 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import java.io.IOError; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -63,10 +62,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; -public class LucenePersistedStateFactoryTests extends ESTestCase { +public class PersistedClusterStateServiceTests extends ESTestCase { - private LucenePersistedStateFactory newPersistedStateFactory(NodeEnvironment nodeEnvironment) { - return new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry(), + private PersistedClusterStateService newPersistedClusterStateService(NodeEnvironment nodeEnvironment) { + return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), usually() ? BigArrays.NON_RECYCLING_INSTANCE : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService())); @@ -74,46 +73,41 @@ private LucenePersistedStateFactory newPersistedStateFactory(NodeEnvironment nod public void testPersistsAndReloadsTerm() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); final long newTerm = randomNonNegativeLong(); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - assertThat(persistedState.getCurrentTerm(), equalTo(0L)); - persistedState.setCurrentTerm(newTerm); - assertThat(persistedState.getCurrentTerm(), equalTo(newTerm)); + assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(0L)); + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE); + assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm)); } - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - assertThat(persistedState.getCurrentTerm(), equalTo(newTerm)); - } + assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm)); } } public void testPersistsAndReloadsGlobalMetadata() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); final String clusterUUID = UUIDs.randomBase64UUID(random()); final long version = randomLongBetween(1L, Long.MAX_VALUE); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(0L, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .clusterUUID(clusterUUID) .clusterUUIDCommitted(true) .version(version)) .incrementVersion().build()); - assertThat(persistedState.getLastAcceptedState().metaData().clusterUUID(), equalTo(clusterUUID)); - assertTrue(persistedState.getLastAcceptedState().metaData().clusterUUIDCommitted()); - } - - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); + clusterState = loadPersistedClusterState(persistedClusterStateService); assertThat(clusterState.metaData().clusterUUID(), equalTo(clusterUUID)); assertTrue(clusterState.metaData().clusterUUIDCommitted()); assertThat(clusterState.metaData().version(), equalTo(version)); + } - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(0L, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .clusterUUID(clusterUUID) .clusterUUIDCommitted(true) @@ -121,12 +115,19 @@ public void testPersistsAndReloadsGlobalMetadata() throws IOException { .incrementVersion().build()); } - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - assertThat(clusterState.metaData().clusterUUID(), equalTo(clusterUUID)); - assertTrue(clusterState.metaData().clusterUUIDCommitted()); - assertThat(clusterState.metaData().version(), equalTo(version + 1)); - } + clusterState = loadPersistedClusterState(persistedClusterStateService); + assertThat(clusterState.metaData().clusterUUID(), equalTo(clusterUUID)); + assertTrue(clusterState.metaData().clusterUUIDCommitted()); + assertThat(clusterState.metaData().version(), equalTo(version + 1)); + } + } + + private static void writeState(Writer writer, long currentTerm, ClusterState clusterState, + ClusterState previousState) throws IOException { + if (randomBoolean() || clusterState.term() != previousState.term() || writer.fullStateWritten == false) { + writer.writeFullStateAndCommit(currentTerm, clusterState); + } else { + writer.writeIncrementalStateAndCommit(currentTerm, previousState, clusterState); } } @@ -140,26 +141,25 @@ public void testLoadsFreshestState() throws IOException { final HashSet unimportantPaths = Arrays.stream(dataPaths).collect(Collectors.toCollection(HashSet::new)); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setCurrentTerm(randomLongBetween(1L, Long.MAX_VALUE)); - persistedState.setLastAcceptedState( + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + writeState(writer, staleTerm, ClusterState.builder(clusterState).version(staleVersion) .metaData(MetaData.builder(clusterState.metaData()).coordinationMetaData( - CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(staleTerm).build())).build()); + CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(staleTerm).build())).build(), + clusterState); } } try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[]{randomFrom(dataPaths)})) { unimportantPaths.remove(nodeEnvironment.nodeDataPaths()[0]); - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState( + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + writeState(writer, freshTerm, ClusterState.builder(clusterState).version(freshVersion) .metaData(MetaData.builder(clusterState.metaData()).coordinationMetaData( - CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(freshTerm).build())).build()); + CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(freshTerm).build())).build(), + clusterState); } } @@ -169,20 +169,11 @@ public void testLoadsFreshestState() throws IOException { // verify that the freshest state is chosen try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - assertThat(persistedState.getLastAcceptedState().term(), equalTo(freshTerm)); - assertThat(persistedState.getLastAcceptedState().version(), equalTo(freshVersion)); - } - } - - // verify that the freshest state was rewritten to each data path - try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[]{randomFrom(dataPaths)})) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - assertThat(persistedState.getLastAcceptedState().term(), equalTo(freshTerm)); - assertThat(persistedState.getLastAcceptedState().version(), equalTo(freshVersion)); - } + final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService(nodeEnvironment) + .loadBestOnDiskState(); + final ClusterState clusterState = clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metaData); + assertThat(clusterState.term(), equalTo(freshTerm)); + assertThat(clusterState.version(), equalTo(freshVersion)); } } @@ -194,19 +185,19 @@ public void testFailsOnMismatchedNodeIds() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths1)) { nodeIds[0] = nodeEnvironment.nodeId(); - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - persistedState.setLastAcceptedState( - ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + writer.writeFullStateAndCommit(0L, + ClusterState.builder(clusterState).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); } } try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) { nodeIds[1] = nodeEnvironment.nodeId(); - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - persistedState.setLastAcceptedState( - ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + writer.writeFullStateAndCommit(0L, + ClusterState.builder(clusterState).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); } } @@ -218,7 +209,7 @@ public void testFailsOnMismatchedNodeIds() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { final String message = expectThrows(IllegalStateException.class, - () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage(); assertThat(message, allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1]))); assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2), @@ -236,17 +227,18 @@ public void testFailsOnMismatchedCommittedClusterUUIDs() throws IOException { // first establish consistent node IDs and write initial metadata try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - assertFalse(persistedState.getLastAcceptedState().metaData().clusterUUIDCommitted()); + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + assertFalse(clusterState.metaData().clusterUUIDCommitted()); + writer.writeFullStateAndCommit(0L, clusterState); } } try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths1)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + assertFalse(clusterState.metaData().clusterUUIDCommitted()); + writer.writeFullStateAndCommit(0L, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .clusterUUID(clusterUUID1) .clusterUUIDCommitted(true) @@ -256,10 +248,10 @@ public void testFailsOnMismatchedCommittedClusterUUIDs() throws IOException { } try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + assertFalse(clusterState.metaData().clusterUUIDCommitted()); + writer.writeFullStateAndCommit(0L, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .clusterUUID(clusterUUID2) .clusterUUIDCommitted(true) @@ -270,7 +262,7 @@ public void testFailsOnMismatchedCommittedClusterUUIDs() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { final String message = expectThrows(IllegalStateException.class, - () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage(); assertThat(message, allOf(containsString("mismatched cluster UUIDs in metadata"), containsString(clusterUUID1), containsString(clusterUUID2))); assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths1), @@ -294,40 +286,41 @@ public void testFailsIfFreshestStateIsInStaleTerm() throws IOException { final long staleVersion = staleTerm == freshTerm ? randomLongBetween(1L, freshVersion - 1) : randomLongBetween(1L, Long.MAX_VALUE); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setCurrentTerm(staleCurrentTerm); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + assertFalse(clusterState.metaData().clusterUUIDCommitted()); + writeState(writer, staleCurrentTerm, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()).version(1) .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(staleTerm).build())) .version(staleVersion) - .build()); + .build(), + clusterState); } } try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths1)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - persistedState.setCurrentTerm(freshCurrentTerm); + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + writeState(writer, freshCurrentTerm, clusterState, clusterState); } } try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService(nodeEnvironment) + .loadBestOnDiskState(); + final ClusterState clusterState = clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metaData); + writeState(writer, onDiskState.currentTerm, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()).version(2) .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(freshTerm).build())) .version(freshVersion) - .build()); + .build(), clusterState); } } try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { final String message = expectThrows(IllegalStateException.class, - () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage(); assertThat(message, allOf( containsString("inconsistent terms found"), containsString(Long.toString(staleCurrentTerm)), @@ -343,8 +336,8 @@ public void testFailsGracefullyOnExceptionDuringFlush() throws IOException { final AtomicBoolean throwException = new AtomicBoolean(); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - final LucenePersistedStateFactory persistedStateFactory - = new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + final PersistedClusterStateService persistedClusterStateService + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { @Override Directory createDirectory(Path path) throws IOException { return new FilterDirectory(super.createDirectory(path)) { @@ -358,11 +351,10 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti }; } }; - final long newTerm = randomNonNegativeLong(); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - persistedState.setCurrentTerm(newTerm); - final ClusterState clusterState = persistedState.getLastAcceptedState(); + try (Writer writer = persistedClusterStateService.createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + final long newTerm = randomNonNegativeLong(); final ClusterState newState = ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .clusterUUID(UUIDs.randomBase64UUID(random())) @@ -370,7 +362,8 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti .version(randomLongBetween(1L, Long.MAX_VALUE))) .incrementVersion().build(); throwException.set(true); - assertThat(expectThrows(UncheckedIOException.class, () -> persistedState.setLastAcceptedState(newState)).getMessage(), + assertThat(expectThrows(IOException.class, () -> + writeState(writer, newTerm, newState, clusterState)).getMessage(), containsString("simulated")); } } @@ -380,8 +373,8 @@ public void testThrowsIOErrorOnExceptionDuringCommit() throws IOException { final AtomicBoolean throwException = new AtomicBoolean(); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - final LucenePersistedStateFactory persistedStateFactory - = new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + final PersistedClusterStateService persistedClusterStateService + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { @Override Directory createDirectory(Path path) throws IOException { return new FilterDirectory(super.createDirectory(path)) { @@ -394,11 +387,10 @@ public void sync(Collection names) throws IOException { }; } }; - final long newTerm = randomNonNegativeLong(); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - persistedState.setCurrentTerm(newTerm); - final ClusterState clusterState = persistedState.getLastAcceptedState(); + try (Writer writer = persistedClusterStateService.createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + final long newTerm = randomNonNegativeLong(); final ClusterState newState = ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .clusterUUID(UUIDs.randomBase64UUID(random())) @@ -406,7 +398,7 @@ public void sync(Collection names) throws IOException { .version(randomLongBetween(1L, Long.MAX_VALUE))) .incrementVersion().build(); throwException.set(true); - assertThat(expectThrows(IOError.class, () -> persistedState.setLastAcceptedState(newState)).getMessage(), + assertThat(expectThrows(IOError.class, () -> writeState(writer, newTerm, newState, clusterState)).getMessage(), containsString("simulated")); } } @@ -417,14 +409,14 @@ public void testFailsIfGlobalMetadataIsMissing() throws IOException { // isn't there any more try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - persistedState.setLastAcceptedState( - ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + writeState(writer, 0L, ClusterState.builder(clusterState).version(randomLongBetween(1L, Long.MAX_VALUE)).build(), + clusterState); } final Path brokenPath = randomFrom(nodeEnvironment.nodeDataPaths()); - try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME))) { + try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME))) { final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(); indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { @@ -433,7 +425,7 @@ public void testFailsIfGlobalMetadataIsMissing() throws IOException { } final String message = expectThrows(IllegalStateException.class, - () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage(); assertThat(message, allOf(containsString("no global metadata found"), containsString(brokenPath.toString()))); } } @@ -447,16 +439,16 @@ public void testFailsIfGlobalMetadataIsDuplicated() throws IOException { final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - persistedState.setLastAcceptedState( - ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + writeState(writer, 0L, ClusterState.builder(clusterState).version(randomLongBetween(1L, Long.MAX_VALUE)).build(), + clusterState); } final Path brokenPath = randomFrom(nodeEnvironment.nodeDataPaths()); final Path dupPath = randomValueOtherThan(brokenPath, () -> randomFrom(nodeEnvironment.nodeDataPaths())); - try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME)); - Directory dupDirectory = new SimpleFSDirectory(dupPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME))) { + try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)); + Directory dupDirectory = new SimpleFSDirectory(dupPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME))) { try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { indexWriter.addIndexes(dupDirectory); indexWriter.commit(); @@ -464,7 +456,7 @@ public void testFailsIfGlobalMetadataIsDuplicated() throws IOException { } final String message = expectThrows(IllegalStateException.class, - () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage(); assertThat(message, allOf(containsString("duplicate global metadata found"), containsString(brokenPath.toString()))); } } @@ -481,27 +473,27 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { final String indexUUID = UUIDs.randomBase64UUID(random()); final String indexName = randomAlphaOfLength(10); - try (CoordinationState.PersistedState persistedState - = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) - .metaData(MetaData.builder(clusterState.metaData()) - .version(1L) - .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(1L).build()) - .put(IndexMetaData.builder(indexName) + try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(newPersistedClusterStateService(nodeEnvironment)); + writeState(writer, 0L, ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) .version(1L) - .settings(Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) - .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) - .put(IndexMetaData.SETTING_INDEX_UUID, indexUUID)))) - .incrementVersion().build()); + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(1L).build()) + .put(IndexMetaData.builder(indexName) + .version(1L) + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, indexUUID)))) + .incrementVersion().build(), + clusterState); } final Path brokenPath = randomFrom(nodeEnvironment.nodeDataPaths()); final Path dupPath = randomValueOtherThan(brokenPath, () -> randomFrom(nodeEnvironment.nodeDataPaths())); - try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME)); - Directory dupDirectory = new SimpleFSDirectory(dupPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME))) { + try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)); + Directory dupDirectory = new SimpleFSDirectory(dupPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME))) { try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { indexWriter.deleteDocuments(new Term("type", "global")); // do not duplicate global metadata indexWriter.addIndexes(dupDirectory); @@ -510,7 +502,7 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { } final String message = expectThrows(IllegalStateException.class, - () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage(); assertThat(message, allOf( containsString("duplicate metadata found"), containsString(brokenPath.toString()), @@ -521,7 +513,7 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); final long globalVersion = randomLongBetween(1L, Long.MAX_VALUE); final String indexUUID = UUIDs.randomBase64UUID(random()); final long indexMetaDataVersion = randomLongBetween(1L, Long.MAX_VALUE); @@ -529,91 +521,85 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws final long oldTerm = randomLongBetween(1L, Long.MAX_VALUE - 1); final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) - .metaData(MetaData.builder(clusterState.metaData()) - .version(globalVersion) - .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(oldTerm).build()) - .put(IndexMetaData.builder("test") - .version(indexMetaDataVersion - 1) // -1 because it's incremented in .put() - .settings(Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) - .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) - .put(IndexMetaData.SETTING_INDEX_UUID, indexUUID)))) - .incrementVersion().build()); - } + try (Writer writer = persistedClusterStateService.createWriter()) { + ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + writeState(writer, 0L, ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .version(globalVersion) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(oldTerm).build()) + .put(IndexMetaData.builder("test") + .version(indexMetaDataVersion - 1) // -1 because it's incremented in .put() + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, indexUUID)))) + .incrementVersion().build(), + clusterState); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + + clusterState = loadPersistedClusterState(persistedClusterStateService); + IndexMetaData indexMetaData = clusterState.metaData().index("test"); assertThat(indexMetaData.getIndexUUID(), equalTo(indexUUID)); assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion)); assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(0)); - // ensure we do not wastefully persist the same index metadata version by making a bad update with the same version - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) - .metaData(MetaData.builder(clusterState.metaData()) - .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() + writer.writeIncrementalStateAndCommit(0L, clusterState, ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() .put(indexMetaData.getSettings()) .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)).build(), false)) - .incrementVersion().build()); - } + .incrementVersion().build()); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + clusterState = loadPersistedClusterState(persistedClusterStateService); + indexMetaData = clusterState.metaData().index("test"); + assertThat(indexMetaData.getIndexUUID(), equalTo(indexUUID)); assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion)); assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(0)); - // ensure that we do persist the same index metadata version by making an update with a higher version - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) - .metaData(MetaData.builder(clusterState.metaData()) - .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() - .put(indexMetaData.getSettings()) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)).build(), true)) - .incrementVersion().build()); - } + writeState(writer, 0L, ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() + .put(indexMetaData.getSettings()) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)).build(), true)) + .incrementVersion().build(), + clusterState); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + clusterState = loadPersistedClusterState(persistedClusterStateService); + indexMetaData = clusterState.metaData().index("test"); assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion + 1)); assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(2)); - // ensure that we also persist the index metadata when the term changes - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) - .metaData(MetaData.builder(clusterState.metaData()) - .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(newTerm).build()) - .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() - .put(indexMetaData.getSettings()) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 3)).build(), false)) - .incrementVersion().build()); + writeState(writer, 0L, ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(newTerm).build()) + .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() + .put(indexMetaData.getSettings()) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 3)).build(), false)) + .incrementVersion().build(), + clusterState); } - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - final IndexMetaData indexMetaData = clusterState.metaData().index("test"); - assertThat(indexMetaData.getIndexUUID(), equalTo(indexUUID)); - assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion + 1)); - assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(3)); - } + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + assertThat(indexMetaData.getIndexUUID(), equalTo(indexUUID)); + assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion + 1)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(3)); } } public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); final long term = randomLongBetween(1L, Long.MAX_VALUE); final String addedIndexUuid = UUIDs.randomBase64UUID(random()); final String updatedIndexUuid = UUIDs.randomBase64UUID(random()); final String deletedIndexUuid = UUIDs.randomBase64UUID(random()); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + try (Writer writer = persistedClusterStateService.createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + writeState(writer, 0L, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .version(clusterState.metaData().version() + 1) .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(term).build()) @@ -631,11 +617,12 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) .put(IndexMetaData.SETTING_INDEX_UUID, deletedIndexUuid)))) - .incrementVersion().build()); + .incrementVersion().build(), + clusterState); } - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); + try (Writer writer = persistedClusterStateService.createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); assertThat(clusterState.metaData().indices().size(), equalTo(2)); assertThat(clusterState.metaData().index("updated").getIndexUUID(), equalTo(updatedIndexUuid)); @@ -643,7 +630,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc equalTo(1)); assertThat(clusterState.metaData().index("deleted").getIndexUUID(), equalTo(deletedIndexUuid)); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + writeState(writer, 0L, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .version(clusterState.metaData().version() + 1) .remove("deleted") @@ -658,35 +645,34 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) .put(IndexMetaData.SETTING_INDEX_UUID, addedIndexUuid)))) - .incrementVersion().build()); + .incrementVersion().build(), + clusterState); } - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); - assertThat(clusterState.metaData().indices().size(), equalTo(2)); - assertThat(clusterState.metaData().index("updated").getIndexUUID(), equalTo(updatedIndexUuid)); - assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(clusterState.metaData().index("updated").getSettings()), - equalTo(2)); - assertThat(clusterState.metaData().index("added").getIndexUUID(), equalTo(addedIndexUuid)); - assertThat(clusterState.metaData().index("deleted"), nullValue()); - } + assertThat(clusterState.metaData().indices().size(), equalTo(2)); + assertThat(clusterState.metaData().index("updated").getIndexUUID(), equalTo(updatedIndexUuid)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(clusterState.metaData().index("updated").getSettings()), + equalTo(2)); + assertThat(clusterState.metaData().index("added").getIndexUUID(), equalTo(addedIndexUuid)); + assertThat(clusterState.metaData().index("deleted"), nullValue()); } } public void testReloadsMetadataAcrossMultipleSegments() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { - final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); final int writes = between(5, 20); final List indices = new ArrayList<>(writes); - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + try (Writer writer = persistedClusterStateService.createWriter()) { for (int i = 0; i < writes; i++) { final Index index = new Index("test-" + i, UUIDs.randomBase64UUID(random())); indices.add(index); - final ClusterState clusterState = persistedState.getLastAcceptedState(); - persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + writeState(writer, 0L, ClusterState.builder(clusterState) .metaData(MetaData.builder(clusterState.metaData()) .version(i + 2) .put(IndexMetaData.builder(index.getName()) @@ -695,16 +681,15 @@ public void testReloadsMetadataAcrossMultipleSegments() throws IOException { .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) .put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())))) - .incrementVersion().build()); + .incrementVersion().build(), + clusterState); } } - try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { - final ClusterState clusterState = persistedState.getLastAcceptedState(); - for (Index index : indices) { - final IndexMetaData indexMetaData = clusterState.metaData().index(index.getName()); - assertThat(indexMetaData.getIndexUUID(), equalTo(index.getUUID())); - } + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + for (Index index : indices) { + final IndexMetaData indexMetaData = clusterState.metaData().index(index.getName()); + assertThat(indexMetaData.getIndexUUID(), equalTo(index.getUUID())); } } } @@ -731,10 +716,9 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException .build()); } - private static CoordinationState.PersistedState loadPersistedState(LucenePersistedStateFactory persistedStateFactory) - throws IOException { - - return persistedStateFactory.loadPersistedState(LucenePersistedStateFactoryTests::clusterStateFromMetadata); + private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException { + final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(); + return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metaData); } private static ClusterState clusterStateFromMetadata(long version, MetaData metaData) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java index 32f999a8e91d6..c4c620a74a826 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -28,7 +28,6 @@ import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -102,7 +101,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // TODO remove corrupted shard tool @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase { @@ -157,8 +155,8 @@ public void testCorruptIndex() throws Exception { fail("expected the command to fail as node is locked"); } catch (Exception e) { assertThat(e.getMessage(), - allOf(containsString("Failed to lock node's directory"), - containsString("is Elasticsearch still running ?"))); + allOf(containsString("failed to lock node's directory"), + containsString("is Elasticsearch still running?"))); } final Path indexDir = getPathToShardData(indexName, ShardPath.INDEX_FOLDER_NAME); @@ -575,8 +573,8 @@ public void testResolvePath() throws Exception { for (String nodeName : nodeNames) { final Path indexPath = indexPathByNodeName.get(nodeName); final OptionSet options = parser.parse("--dir", indexPath.toAbsolutePath().toString()); - command.findAndProcessShardPath(options, environmentByNodeName.get(nodeName), - shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath))); + command.findAndProcessShardPath(options, environmentByNodeName.get(nodeName), environmentByNodeName.get(nodeName).dataFiles(), + state, shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath))); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index df7423fc8577e..f84a41131717d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -30,19 +30,26 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cli.Terminal; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.EngineException; @@ -60,6 +67,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; +import java.util.Objects; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -80,7 +88,9 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { private Environment environment; private ShardPath shardPath; private IndexMetaData indexMetaData; + private ClusterState clusterState; private IndexShard indexShard; + private Path[] dataPaths; private Path translogPath; private Path indexPath; @@ -89,7 +99,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { @Before public void setup() throws IOException { - shardId = new ShardId("index0", "_na_", 0); + shardId = new ShardId("index0", UUIDs.randomBase64UUID(), 0); final String nodeId = randomAlphaOfLength(10); routing = TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); @@ -103,11 +113,13 @@ public void setup() throws IOException { // create same directory structure as prod does Files.createDirectories(dataDir); + dataPaths = new Path[] {dataDir}; final Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_INDEX_UUID, shardId.getIndex().getUUID()) .build(); final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(dataDir); @@ -135,6 +147,19 @@ public void setup() throws IOException { .putMapping("{ \"properties\": {} }"); indexMetaData = metaData.build(); + clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(MetaData.builder().put(indexMetaData, false).build()).build(); + + try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) { + final Path[] dataPaths = + Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new); + NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, Version.CURRENT), dataPaths); + + try (PersistedClusterStateService.Writer writer = + ElasticsearchNodeCommand.createPersistedClusterStateService(dataPaths).createWriter()) { + writer.writeFullStateAndCommit(1L, clusterState); + } + } + indexShard = newStartedShard(p -> newShard(routing, shardPath, indexMetaData, null, null, new InternalEngineFactory(), () -> { }, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER), true); @@ -355,7 +380,6 @@ public void testResolveIndexDirectory() throws Exception { // index a single doc to have files on a disk indexDoc(indexShard, "_doc", "0", "{}"); flushShard(indexShard, true); - writeIndexState(); // close shard closeShards(indexShard); @@ -367,11 +391,11 @@ public void testResolveIndexDirectory() throws Exception { final OptionSet options = parser.parse("--index", shardId.getIndex().getName(), "--shard-id", Integer.toString(shardId.id())); - command.findAndProcessShardPath(options, environment, + command.findAndProcessShardPath(options, environment, dataPaths, clusterState, shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath))); final OptionSet options2 = parser.parse("--dir", indexPath.toAbsolutePath().toString()); - command.findAndProcessShardPath(options2, environment, + command.findAndProcessShardPath(options2, environment, dataPaths, clusterState, shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath))); } @@ -509,17 +533,7 @@ private int indexDocs(IndexShard indexShard, boolean flushLast) throws IOExcepti logger.info("--> indexed {} docs, {} to keep", numDocs, numDocsToKeep); - writeIndexState(); return numDocsToKeep; } - private void writeIndexState() throws IOException { - // create _state of IndexMetaData - try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment)) { - final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex()); - IndexMetaData.FORMAT.writeAndCleanup(indexMetaData, paths); - logger.info("--> index metadata persisted to {} ", Arrays.toString(paths)); - } - } - } diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index 79bfd5512c2c2..25f43e28ca88c 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -20,10 +20,12 @@ package org.elasticsearch.gateway; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -33,6 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -68,7 +72,13 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont final ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()) .thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - start(settings, transportService, clusterService, - null, null, new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE)); + final MetaStateService metaStateService = mock(MetaStateService.class); + try { + when(metaStateService.loadFullState()).thenReturn(new Tuple<>(Manifest.empty(), MetaData.builder().build())); + } catch (IOException e) { + throw new AssertionError(e); + } + start(settings, transportService, clusterService, metaStateService, + null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE)); } }