diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Manifest.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Manifest.java new file mode 100644 index 0000000000000..252e590c26730 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Manifest.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.gateway.MetaDataStateFormat; +import org.elasticsearch.index.Index; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * This class represents the manifest file, which is the entry point for reading meta data from disk. + * Metadata consists of global metadata and index metadata. + * When new version of metadata is written it's assigned some generation long value. + * Global metadata generation could be obtained by calling {@link #getGlobalGeneration()}. + * Index metadata generation could be obtained by calling {@link #getIndexGenerations()}. + */ +public class Manifest implements ToXContentFragment { + private static final long MISSING_GLOBAL_GENERATION = -1; + + private final long globalGeneration; + private final Map indexGenerations; + + public Manifest(long globalGeneration, Map indexGenerations) { + this.globalGeneration = globalGeneration; + this.indexGenerations = indexGenerations; + } + + /** + * Returns global metadata generation. + */ + public long getGlobalGeneration() { + return globalGeneration; + } + + /** + * Returns map from {@link Index} to index metadata generation. + */ + public Map getIndexGenerations() { + return indexGenerations; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Manifest manifest = (Manifest) o; + return globalGeneration == manifest.globalGeneration && + Objects.equals(indexGenerations, manifest.indexGenerations); + } + + @Override + public int hashCode() { + return Objects.hash(globalGeneration, indexGenerations); + } + + private static final String MANIFEST_FILE_PREFIX = "manifest-"; + private static final ToXContent.Params MANIFEST_FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("binary", "true")); + + public static final MetaDataStateFormat FORMAT = new MetaDataStateFormat(MANIFEST_FILE_PREFIX) { + + @Override + public void toXContent(XContentBuilder builder, Manifest state) throws IOException { + state.toXContent(builder, MANIFEST_FORMAT_PARAMS); + } + + @Override + public Manifest fromXContent(XContentParser parser) throws IOException { + return Manifest.fromXContent(parser); + } + }; + + + /* + * Code below this comment is for XContent parsing/generation + */ + + private static final ParseField GENERATION_PARSE_FIELD = new ParseField("generation"); + private static final ParseField INDEX_GENERATIONS_PARSE_FIELD = new ParseField("index_generations"); + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(GENERATION_PARSE_FIELD.getPreferredName(), globalGeneration); + builder.array(INDEX_GENERATIONS_PARSE_FIELD.getPreferredName(), indexEntryList().toArray()); + return builder; + } + + private List indexEntryList() { + return indexGenerations.entrySet().stream(). + map(entry -> new IndexEntry(entry.getKey(), entry.getValue())). + collect(Collectors.toList()); + } + + private static long generation(Object[] generationAndListOfIndexEntries) { + return (Long) generationAndListOfIndexEntries[0]; + } + + private static Map indices(Object[] generationAndListOfIndexEntries) { + List listOfIndices = (List) generationAndListOfIndexEntries[1]; + return listOfIndices.stream().collect(Collectors.toMap(IndexEntry::getIndex, IndexEntry::getGeneration)); + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "manifest", + generationAndListOfIndexEntries -> + new Manifest(generation(generationAndListOfIndexEntries), indices(generationAndListOfIndexEntries))); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_PARSE_FIELD); + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), IndexEntry.INDEX_ENTRY_PARSER, INDEX_GENERATIONS_PARSE_FIELD); + } + + public static Manifest fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public boolean isEmpty() { + return globalGeneration == MISSING_GLOBAL_GENERATION && indexGenerations.isEmpty(); + } + + public static Manifest empty() { + return new Manifest(MISSING_GLOBAL_GENERATION, Collections.emptyMap()); + } + + public boolean isGlobalGenerationMissing() { + return globalGeneration == MISSING_GLOBAL_GENERATION; + } + + private static final class IndexEntry implements ToXContentFragment { + private static final ParseField INDEX_GENERATION_PARSE_FIELD = new ParseField("generation"); + private static final ParseField INDEX_PARSE_FIELD = new ParseField("index"); + + static final ConstructingObjectParser INDEX_ENTRY_PARSER = new ConstructingObjectParser<>( + "indexEntry", + indexAndGeneration -> new IndexEntry((Index) indexAndGeneration[0], (long) indexAndGeneration[1])); + + static { + INDEX_ENTRY_PARSER.declareField(ConstructingObjectParser.constructorArg(), + Index::fromXContent, INDEX_PARSE_FIELD, ObjectParser.ValueType.OBJECT); + INDEX_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_GENERATION_PARSE_FIELD); + } + + private final long generation; + private final Index index; + + IndexEntry(Index index, long generation) { + this.index = index; + this.generation = generation; + } + + public long getGeneration() { + return generation; + } + + public Index getIndex() { + return index; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(INDEX_PARSE_FIELD.getPreferredName(), index); + builder.field(GENERATION_PARSE_FIELD.getPreferredName(), generation); + builder.endObject(); + return builder; + } + } +} + diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index ff8baaabb443c..f07e5da437d67 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -390,7 +390,7 @@ private static NodeMetaData loadOrCreateNodeMetaData(Settings settings, Logger l metaData = new NodeMetaData(generateNodeId(settings)); } // we write again to make sure all paths have the latest state file - NodeMetaData.FORMAT.write(metaData, paths); + NodeMetaData.FORMAT.writeAndCleanup(metaData, paths); return metaData; } diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 9bbb5af5bf028..62c2c56acac19 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -20,11 +20,14 @@ package org.elasticsearch.gateway; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -32,7 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; @@ -54,133 +57,253 @@ import java.util.function.Consumer; import java.util.function.UnaryOperator; -import static java.util.Collections.emptySet; -import static java.util.Collections.unmodifiableSet; - -public class GatewayMetaState extends AbstractComponent implements ClusterStateApplier { +/** + * This class is responsible for storing/retrieving metadata to/from disk. + * When instance of this class is created, constructor ensures that this version is compatible with state stored on disk and performs + * state upgrade if necessary. Also it checks that atomic move is supported on the filesystem level, because it's a must for metadata + * store algorithm. + * Please note that the state being loaded when constructing the instance of this class is NOT the state that will be used as a + * {@link ClusterState#metaData()}. Instead when node is starting up, it calls {@link #loadMetaData()} method and if this node is + * elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the + * gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster. + * It means that the first time {@link #applyClusterState(ClusterChangedEvent)} method is called, it won't have any previous metaData in + * memory and will iterate over all the indices in received {@link ClusterState} and store them to disk. + */ +public class GatewayMetaState implements ClusterStateApplier { + private static final Logger logger = LogManager.getLogger(GatewayMetaState.class); private final NodeEnvironment nodeEnv; private final MetaStateService metaStateService; + private final Settings settings; @Nullable - private volatile MetaData previousMetaData; - - private volatile Set previouslyWrittenIndices = emptySet(); + //there is a single thread executing applyClusterState calls, hence no volatile modifier + private Manifest previousManifest; + private MetaData previousMetaData; public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) throws IOException { + this.settings = settings; this.nodeEnv = nodeEnv; this.metaStateService = metaStateService; - if (DiscoveryNode.isDataNode(settings)) { - ensureNoPre019ShardState(nodeEnv); - } + ensureNoPre019State(); //TODO remove this check, it's Elasticsearch version 7 already + ensureAtomicMoveSupported(); //TODO move this check to NodeEnvironment, because it's related to all types of metadata + upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader); + profileLoadMetaData(); + } - if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { - nodeEnv.ensureAtomicMoveSupported(); + private void profileLoadMetaData() throws IOException { + if (isMasterOrDataNode()) { + long startNS = System.nanoTime(); + metaStateService.loadFullState(); + logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); } - if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { + } + + private void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) + throws IOException { + if (isMasterOrDataNode()) { try { - ensureNoPre019State(); - final MetaData metaData = metaStateService.loadFullState(); - final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader); + final Tuple metaStateAndData = metaStateService.loadFullState(); + final Manifest manifest = metaStateAndData.v1(); + final MetaData metaData = metaStateAndData.v2(); + // We finished global state validation and successfully checked all indices for backward compatibility // and found no non-upgradable indices, which means the upgrade can continue. // Now it's safe to overwrite global and index metadata. - if (metaData != upgradedMetaData) { - if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) { - metaStateService.writeGlobalState("upgrade", upgradedMetaData); - } - for (IndexMetaData indexMetaData : upgradedMetaData) { - if (metaData.hasIndexMetaData(indexMetaData) == false) { - metaStateService.writeIndex("upgrade", indexMetaData); - } + // We don't re-write metadata if it's not upgraded by upgrade plugins, because + // if there is manifest file, it means metadata is properly persisted to all data paths + // if there is no manifest file (upgrade from 6.x to 7.x) metadata might be missing on some data paths, + // but anyway we will re-write it as soon as we receive first ClusterState + final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, manifest); + final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader); + + final long globalStateGeneration; + if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) { + globalStateGeneration = writer.writeGlobalState("upgrade", upgradedMetaData); + } else { + globalStateGeneration = manifest.getGlobalGeneration(); + } + + Map indices = new HashMap<>(manifest.getIndexGenerations()); + for (IndexMetaData indexMetaData : upgradedMetaData) { + if (metaData.hasIndexMetaData(indexMetaData) == false) { + final long generation = writer.writeIndex("upgrade", indexMetaData); + indices.put(indexMetaData.getIndex(), generation); } } - long startNS = System.nanoTime(); - metaStateService.loadFullState(); - logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); + + final Manifest newManifest = new Manifest(globalStateGeneration, indices); + writer.writeManifestAndCleanup("startup", newManifest); } catch (Exception e) { - logger.error("failed to read local state, exiting...", e); + logger.error("failed to read or upgrade local state, exiting...", e); throw e; } } } - public MetaData loadMetaState() throws IOException { - return metaStateService.loadFullState(); + private boolean isMasterOrDataNode() { + return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings); + } + + private void ensureAtomicMoveSupported() throws IOException { + if (isMasterOrDataNode()) { + nodeEnv.ensureAtomicMoveSupported(); + } + } + + public MetaData loadMetaData() throws IOException { + return metaStateService.loadFullState().v2(); } @Override public void applyClusterState(ClusterChangedEvent event) { + if (isMasterOrDataNode() == false) { + return; + } - final ClusterState state = event.state(); - if (state.blocks().disableStatePersistence()) { - // reset the current metadata, we need to start fresh... - this.previousMetaData = null; - previouslyWrittenIndices = emptySet(); + if (event.state().blocks().disableStatePersistence()) { + // reset the current state, we need to start fresh... + previousMetaData = null; + previousManifest = null; return; } - MetaData newMetaData = state.metaData(); - // we don't check if metaData changed, since we might be called several times and we need to check dangling... - Set relevantIndices = Collections.emptySet(); - boolean success = true; - // write the state if this node is a master eligible node or if it is a data node and has shards allocated on it - if (state.nodes().getLocalNode().isMasterNode() || state.nodes().getLocalNode().isDataNode()) { - if (previousMetaData == null) { - try { - // we determine if or if not we write meta data on data only nodes by looking at the shard routing - // and only write if a shard of this index is allocated on this node - // however, closed indices do not appear in the shard routing. if the meta data for a closed index is - // updated it will therefore not be written in case the list of previouslyWrittenIndices is empty (because state - // persistence was disabled or the node was restarted), see getRelevantIndicesOnDataOnlyNode(). - // we therefore have to check here if we have shards on disk and add their indices to the previouslyWrittenIndices list - if (isDataOnlyNode(state)) { - Set newPreviouslyWrittenIndices = new HashSet<>(previouslyWrittenIndices.size()); - for (IndexMetaData indexMetaData : newMetaData) { - IndexMetaData indexMetaDataOnDisk = null; - if (indexMetaData.getState().equals(IndexMetaData.State.CLOSE)) { - indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.getIndex()); - } - if (indexMetaDataOnDisk != null) { - newPreviouslyWrittenIndices.add(indexMetaDataOnDisk.getIndex()); - } - } - newPreviouslyWrittenIndices.addAll(previouslyWrittenIndices); - previouslyWrittenIndices = unmodifiableSet(newPreviouslyWrittenIndices); - } - } catch (Exception e) { - success = false; - } - } - // check if the global state changed? - if (previousMetaData == null || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) { - try { - metaStateService.writeGlobalState("changed", newMetaData); - } catch (Exception e) { - success = false; - } + try { + if (previousManifest == null) { + previousManifest = metaStateService.loadManifestOrEmpty(); } + updateMetaData(event); + } catch (Exception e) { + logger.warn("Exception occurred when storing new meta data", e); + } + } + /** + * This class is used to write changed global {@link MetaData}, {@link IndexMetaData} and {@link Manifest} to disk. + * This class delegates write* calls to corresponding write calls in {@link MetaStateService} and + * additionally it keeps track of cleanup actions to be performed if transaction succeeds or fails. + */ + static class AtomicClusterStateWriter { + private static final String FINISHED_MSG = "AtomicClusterStateWriter is finished"; + private final List commitCleanupActions; + private final List rollbackCleanupActions; + private final Manifest previousManifest; + private final MetaStateService metaStateService; + private boolean finished; + + AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) { + this.metaStateService = metaStateService; + assert previousManifest != null; + this.previousManifest = previousManifest; + this.commitCleanupActions = new ArrayList<>(); + this.rollbackCleanupActions = new ArrayList<>(); + this.finished = false; + } + + long writeGlobalState(String reason, MetaData metaData) throws WriteStateException { + assert finished == false : FINISHED_MSG; + try { + rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration())); + long generation = metaStateService.writeGlobalState(reason, metaData); + commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation)); + return generation; + } catch (WriteStateException e) { + rollback(); + throw e; + } + } - relevantIndices = getRelevantIndices(event.state(), event.previousState(), previouslyWrittenIndices); - final Iterable writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, - previousMetaData, event.state().metaData()); - // check and write changes in indices - for (IndexMetaWriteInfo indexMetaWrite : writeInfo) { - try { - metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData); - } catch (Exception e) { - success = false; + long writeIndex(String reason, IndexMetaData metaData) throws WriteStateException { + assert finished == false : FINISHED_MSG; + try { + Index index = metaData.getIndex(); + Long previousGeneration = previousManifest.getIndexGenerations().get(index); + if (previousGeneration != null) { + // we prefer not to clean-up index metadata in case of rollback, + // if it's not referenced by previous manifest file + // not to break dangling indices functionality + rollbackCleanupActions.add(() -> metaStateService.cleanupIndex(index, previousGeneration)); } + long generation = metaStateService.writeIndex(reason, metaData); + commitCleanupActions.add(() -> metaStateService.cleanupIndex(index, generation)); + return generation; + } catch (WriteStateException e) { + rollback(); + throw e; + } + } + + long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException { + assert finished == false : FINISHED_MSG; + try { + long generation = metaStateService.writeManifestAndCleanup(reason, manifest); + commitCleanupActions.forEach(Runnable::run); + finished = true; + return generation; + } catch (WriteStateException e) { + rollback(); + throw e; } } - if (success) { - previousMetaData = newMetaData; - previouslyWrittenIndices = unmodifiableSet(relevantIndices); + void rollback() { + rollbackCleanupActions.forEach(Runnable::run); + finished = true; + } + } + + /** + * Updates meta state and meta data on disk according to {@link ClusterChangedEvent}. + * + * @throws IOException if IOException occurs. It's recommended for the callers of this method to handle {@link WriteStateException}, + * which is subclass of {@link IOException} explicitly. See also {@link WriteStateException#isDirty()}. + */ + private void updateMetaData(ClusterChangedEvent event) throws IOException { + ClusterState newState = event.state(); + ClusterState previousState = event.previousState(); + MetaData newMetaData = newState.metaData(); + + final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest); + long globalStateGeneration = writeGlobalState(writer, newMetaData); + Map indexGenerations = writeIndicesMetadata(writer, newState, previousState); + Manifest manifest = new Manifest(globalStateGeneration, indexGenerations); + writeManifest(writer, manifest); + + previousMetaData = newMetaData; + previousManifest = manifest; + } + + private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws IOException { + if (manifest.equals(previousManifest) == false) { + writer.writeManifestAndCleanup("changed", manifest); + } + } + + private Map writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState, ClusterState previousState) + throws IOException { + Map previouslyWrittenIndices = previousManifest.getIndexGenerations(); + Set relevantIndices = getRelevantIndices(newState, previousState, previouslyWrittenIndices.keySet()); + + Map newIndices = new HashMap<>(); + + Iterable actions = resolveIndexMetaDataActions(previouslyWrittenIndices, relevantIndices, previousMetaData, + newState.metaData()); + + for (IndexMetaDataAction action : actions) { + long generation = action.execute(writer); + newIndices.put(action.getIndex(), generation); + } + + return newIndices; + } + + private long writeGlobalState(AtomicClusterStateWriter writer, MetaData newMetaData) throws IOException { + if (previousMetaData == null || MetaData.isGlobalStateEquals(previousMetaData, newMetaData) == false) { + return writer.writeGlobalState("changed", newMetaData); } + return previousManifest.getGlobalGeneration(); } public static Set getRelevantIndices(ClusterState state, ClusterState previousState, Set previouslyWrittenIndices) { @@ -196,14 +319,24 @@ public static Set getRelevantIndices(ClusterState state, ClusterState pre } - protected static boolean isDataOnlyNode(ClusterState state) { + private static boolean isDataOnlyNode(ClusterState state) { return ((state.nodes().getLocalNode().isMasterNode() == false) && state.nodes().getLocalNode().isDataNode()); } + + private void ensureNoPre019State() throws IOException { + if (DiscoveryNode.isDataNode(settings)) { + ensureNoPre019ShardState(); + } + if (isMasterOrDataNode()) { + ensureNoPre019MetadataFiles(); + } + } + /** * Throws an IAE if a pre 0.19 state is detected */ - private void ensureNoPre019State() throws IOException { + private void ensureNoPre019MetadataFiles() throws IOException { for (Path dataLocation : nodeEnv.nodeDataPaths()) { final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME); if (!Files.exists(stateLocation)) { @@ -217,8 +350,24 @@ private void ensureNoPre019State() throws IOException { final String name = stateFile.getFileName().toString(); if (name.startsWith("metadata-")) { throw new IllegalStateException("Detected pre 0.19 metadata file please upgrade to a version before " - + Version.CURRENT.minimumIndexCompatibilityVersion() - + " first to upgrade state structures - metadata found: [" + stateFile.getParent().toAbsolutePath()); + + Version.CURRENT.minimumIndexCompatibilityVersion() + + " first to upgrade state structures - metadata found: [" + stateFile.getParent().toAbsolutePath()); + } + } + } + } + } + + // shard state BWC + private void ensureNoPre019ShardState() throws IOException { + for (Path dataLocation : nodeEnv.nodeDataPaths()) { + final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME); + if (Files.exists(stateLocation)) { + try (DirectoryStream stream = Files.newDirectoryStream(stateLocation, "shards-*")) { + for (Path stateFile : stream) { + throw new IllegalStateException("Detected pre 0.19 shard state file please upgrade to a version before " + + Version.CURRENT.minimumIndexCompatibilityVersion() + + " first to upgrade state structures - shard state found: [" + stateFile.getParent().toAbsolutePath()); } } } @@ -235,24 +384,24 @@ private void ensureNoPre019State() throws IOException { */ static MetaData upgradeMetaData(MetaData metaData, MetaDataIndexUpgradeService metaDataIndexUpgradeService, - MetaDataUpgrader metaDataUpgrader) throws IOException { + MetaDataUpgrader metaDataUpgrader) { // upgrade index meta data boolean changed = false; final MetaData.Builder upgradedMetaData = MetaData.builder(metaData); for (IndexMetaData indexMetaData : metaData) { IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, - Version.CURRENT.minimumIndexCompatibilityVersion()); + Version.CURRENT.minimumIndexCompatibilityVersion()); changed |= indexMetaData != newMetaData; upgradedMetaData.put(newMetaData, false); } // upgrade global custom meta data if (applyPluginUpgraders(metaData.getCustoms(), metaDataUpgrader.customMetaDataUpgraders, - upgradedMetaData::removeCustom,upgradedMetaData::putCustom)) { + upgradedMetaData::removeCustom, upgradedMetaData::putCustom)) { changed = true; } // upgrade current templates if (applyPluginUpgraders(metaData.getTemplates(), metaDataUpgrader.indexTemplateMetaDataUpgraders, - upgradedMetaData::removeTemplate, (s, indexTemplateMetaData) -> upgradedMetaData.put(indexTemplateMetaData))) { + upgradedMetaData::removeTemplate, (s, indexTemplateMetaData) -> upgradedMetaData.put(indexTemplateMetaData))) { changed = true; } return changed ? upgradedMetaData.build() : metaData; @@ -280,57 +429,51 @@ private static boolean applyPluginUpgraders(ImmutableOpenMap stream = Files.newDirectoryStream(stateLocation, "shards-*")) { - for (Path stateFile : stream) { - throw new IllegalStateException("Detected pre 0.19 shard state file please upgrade to a version before " - + Version.CURRENT.minimumIndexCompatibilityVersion() - + " first to upgrade state structures - shard state found: [" + stateFile.getParent().toAbsolutePath()); - } - } - } - } - } - /** - * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted. - * Each index state that should be written to disk will be returned. This is only run for data only nodes. - * It will return only the states for indices that actually have a shard allocated on the current node. + * Returns list of {@link IndexMetaDataAction} for each relevant index. + * For each relevant index there are 3 options: + *
    + *
  1. + * {@link KeepPreviousGeneration} - index metadata is already stored to disk and index metadata version is not changed, no + * action is required. + *
  2. + *
  3. + * {@link WriteNewIndexMetaData} - there is no index metadata on disk and index metadata for this index should be written. + *
  4. + *
  5. + * {@link WriteChangedIndexMetaData} - index metadata is already on disk, but index metadata version has changed. Updated + * index metadata should be written to disk. + *
  6. + *
* - * @param previouslyWrittenIndices A list of indices for which the state was already written before - * @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written - * @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is - * persisted now - * @param newMetaData The new metadata - * @return iterable over all indices states that should be written to disk + * @param previouslyWrittenIndices A list of indices for which the state was already written before + * @param relevantIndices The list of indices for which state should potentially be written + * @param previousMetaData The last meta data we know of + * @param newMetaData The new metadata + * @return list of {@link IndexMetaDataAction} for each relevant index. */ - public static Iterable resolveStatesToBeWritten(Set previouslyWrittenIndices, - Set potentiallyUnwrittenIndices, - MetaData previousMetaData, MetaData newMetaData) { - List indicesToWrite = new ArrayList<>(); - for (Index index : potentiallyUnwrittenIndices) { + public static List resolveIndexMetaDataActions(Map previouslyWrittenIndices, + Set relevantIndices, + MetaData previousMetaData, + MetaData newMetaData) { + List actions = new ArrayList<>(); + for (Index index : relevantIndices) { IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index); IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index); - String writeReason = null; - if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) { - writeReason = "freshly created"; + + if (previouslyWrittenIndices.containsKey(index) == false || previousIndexMetaData == null) { + actions.add(new WriteNewIndexMetaData(newIndexMetaData)); } else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) { - writeReason = "version changed from [" + previousIndexMetaData.getVersion() + "] to [" + - newIndexMetaData.getVersion() + "]"; - } - if (writeReason != null) { - indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason)); + actions.add(new WriteChangedIndexMetaData(previousIndexMetaData, newIndexMetaData)); + } else { + actions.add(new KeepPreviousGeneration(index, previouslyWrittenIndices.get(index))); } } - return indicesToWrite; + return actions; } - public static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, - Set previouslyWrittenIndices) { + private static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set + previouslyWrittenIndices) { RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); if (newRoutingNode == null) { throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); @@ -356,7 +499,7 @@ public static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, Cl return indices; } - public static Set getRelevantIndicesForMasterEligibleNode(ClusterState state) { + private static Set getRelevantIndicesForMasterEligibleNode(ClusterState state) { Set relevantIndices; relevantIndices = new HashSet<>(); // we have to iterate over the metadata to make sure we also capture closed indices @@ -366,24 +509,81 @@ public static Set getRelevantIndicesForMasterEligibleNode(ClusterState st return relevantIndices; } + /** + * Action to perform with index metadata. + */ + public interface IndexMetaDataAction { + /** + * @return index for index metadata. + */ + Index getIndex(); + + /** + * Executes this action using provided {@link AtomicClusterStateWriter}. + * + * @return new index metadata state generation, to be used in manifest file. + * @throws WriteStateException if exception occurs. + */ + long execute(AtomicClusterStateWriter writer) throws WriteStateException; + } + + public static class KeepPreviousGeneration implements IndexMetaDataAction { + private final Index index; + private final long generation; + + KeepPreviousGeneration(Index index, long generation) { + this.index = index; + this.generation = generation; + } + + @Override + public Index getIndex() { + return index; + } + + @Override + public long execute(AtomicClusterStateWriter writer) { + return generation; + } + } + + public static class WriteNewIndexMetaData implements IndexMetaDataAction { + private final IndexMetaData indexMetaData; + + WriteNewIndexMetaData(IndexMetaData indexMetaData) { + this.indexMetaData = indexMetaData; + } + + @Override + public Index getIndex() { + return indexMetaData.getIndex(); + } + + @Override + public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + return writer.writeIndex("freshly created", indexMetaData); + } + } - public static class IndexMetaWriteInfo { - final IndexMetaData newMetaData; - final String reason; - final IndexMetaData previousMetaData; + public static class WriteChangedIndexMetaData implements IndexMetaDataAction { + private final IndexMetaData newIndexMetaData; + private final IndexMetaData oldIndexMetaData; - public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) { - this.newMetaData = newMetaData; - this.reason = reason; - this.previousMetaData = previousMetaData; + WriteChangedIndexMetaData(IndexMetaData oldIndexMetaData, IndexMetaData newIndexMetaData) { + this.oldIndexMetaData = oldIndexMetaData; + this.newIndexMetaData = newIndexMetaData; } - public IndexMetaData getNewMetaData() { - return newMetaData; + @Override + public Index getIndex() { + return newIndexMetaData.getIndex(); } - public String getReason() { - return reason; + @Override + public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + return writer.writeIndex( + "version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]", + newIndexMetaData); } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index 80f3924beeae0..3f28fead29439 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -114,7 +114,6 @@ public void close() { // in order to write the footer we need to prevent closing the actual index input. } })) { - builder.startObject(); toXContent(builder, state); builder.endObject(); @@ -177,20 +176,40 @@ private static void performStateDirectoriesFsync(List> st } } + /** + * Writes the given state to the given directories and performs cleanup of old state files if the write succeeds or + * newly created state file if write fails. + * See also {@link #write(Object, Path...)} and {@link #cleanupOldFiles(long, Path[])}. + */ + public final long writeAndCleanup(final T state, final Path... locations) throws WriteStateException { + return write(state, true, locations); + } + /** * Writes the given state to the given directories. The state is written to a * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to * it's target filename of the pattern {@code {prefix}{version}.st}. * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return - * it. + * it.
+ * This method always performs cleanup of temporary files regardless whether it succeeds or fails. Cleanup logic for state files is + * more involved. + * If this method fails with an exception, it performs cleanup of newly created state file. + * But if this method succeeds, it does not perform cleanup of old state files. + * If this write succeeds, but some further write fails, you may want to rollback the transaction and keep old file around. + * After transaction is finished use {@link #cleanupOldFiles(long, Path[])} for the clean-up. + * If this write is not a part of bigger transaction, consider using {@link #writeAndCleanup(Object, Path...)} method instead. * * @param state the state object to write * @param locations the locations where the state should be written to. * @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}. + * @return generation of newly written state. */ + public final long write(final T state, final Path... locations) throws WriteStateException { + return write(state, false, locations); + } - public final void write(final T state, final Path... locations) throws WriteStateException { + private long write(final T state, boolean cleanup, final Path... locations) throws WriteStateException { if (locations == null) { throw new IllegalArgumentException("Locations must not be null"); } @@ -198,15 +217,16 @@ public final void write(final T state, final Path... locations) throws WriteStat throw new IllegalArgumentException("One or more locations required"); } - long maxStateId; + final long oldGenerationId, newGenerationId; try { - maxStateId = findMaxStateId(prefix, locations) + 1; + oldGenerationId = findMaxGenerationId(prefix, locations); + newGenerationId = oldGenerationId + 1; } catch (Exception e) { - throw new WriteStateException(false, "exception during looking up max state id", e); + throw new WriteStateException(false, "exception during looking up new generation id", e); } - assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; + assert newGenerationId >= 0 : "newGenerationId must be positive but was: [" + oldGenerationId + "]"; - final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; + final String fileName = getStateFileName(newGenerationId); final String tmpFileName = fileName + ".tmp"; List> directories = new ArrayList<>(); @@ -224,6 +244,11 @@ public final void write(final T state, final Path... locations) throws WriteStat copyStateToExtraLocations(directories, tmpFileName); performRenames(tmpFileName, fileName, directories); performStateDirectoriesFsync(directories); + } catch (WriteStateException e) { + if (cleanup) { + cleanupOldFiles(oldGenerationId, locations); + } + throw e; } finally { for (Tuple pathAndDirectory : directories) { deleteFileIgnoreExceptions(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); @@ -231,7 +256,11 @@ public final void write(final T state, final Path... locations) throws WriteStat } } - cleanupOldFiles(fileName, locations); + if (cleanup) { + cleanupOldFiles(newGenerationId, locations); + } + + return newGenerationId; } protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException { @@ -257,7 +286,7 @@ protected XContentBuilder newXContentBuilder(XContentType type, OutputStream str public final T read(NamedXContentRegistry namedXContentRegistry, Path file) throws IOException { try (Directory dir = newDirectory(file.getParent())) { try (IndexInput indexInput = dir.openInput(file.getFileName().toString(), IOContext.DEFAULT)) { - // We checksum the entire file before we even go and parse it. If it's corrupted we barf right here. + // We checksum the entire file before we even go and parse it. If it's corrupted we barf right here. CodecUtil.checksumEntireFile(indexInput); CodecUtil.checkHeader(indexInput, STATE_FILE_CODEC, MIN_COMPATIBLE_STATE_FILE_VERSION, STATE_FILE_VERSION); final XContentType xContentType = XContentType.values()[indexInput.readInt()]; @@ -269,7 +298,7 @@ public final T read(NamedXContentRegistry namedXContentRegistry, Path file) thro try (IndexInput slice = indexInput.slice("state_xcontent", filePointer, contentSize)) { try (XContentParser parser = XContentFactory.xContent(FORMAT) .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, - new InputStreamIndexInput(slice, contentSize))) { + new InputStreamIndexInput(slice, contentSize))) { return fromXContent(parser); } } @@ -284,13 +313,21 @@ protected Directory newDirectory(Path dir) throws IOException { return new SimpleFSDirectory(dir); } - private void cleanupOldFiles(final String currentStateFile, Path[] locations) { + + /** + * Clean ups all state files not matching passed generation. + * + * @param currentGeneration state generation to keep. + * @param locations state paths. + */ + public void cleanupOldFiles(final long currentGeneration, Path[] locations) { + final String fileNameToKeep = getStateFileName(currentGeneration); for (Path location : locations) { logger.trace("cleanupOldFiles: cleaning up {}", location); Path stateLocation = location.resolve(STATE_DIR_NAME); try (Directory stateDir = newDirectory(stateLocation)) { for (String file : stateDir.listAll()) { - if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { + if (file.startsWith(prefix) && file.equals(fileNameToKeep) == false) { deleteFileIgnoreExceptions(stateLocation, stateDir, file); } } @@ -308,7 +345,7 @@ private void cleanupOldFiles(final String currentStateFile, Path[] locations) { * @return maximum id of state file or -1 if no such files are found * @throws IOException if IOException occurs */ - private long findMaxStateId(final String prefix, Path... locations) throws IOException { + private long findMaxGenerationId(final String prefix, Path... locations) throws IOException { long maxId = -1; for (Path dataLocation : locations) { final Path resolve = dataLocation.resolve(STATE_DIR_NAME); @@ -333,7 +370,7 @@ private List findStateFilesByGeneration(final long generation, Path... loc return files; } - final String fileName = prefix + generation + STATE_FILE_EXTENSION; + final String fileName = getStateFileName(generation); for (Path dataLocation : locations) { final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName); if (Files.exists(stateFilePath)) { @@ -345,32 +382,27 @@ private List findStateFilesByGeneration(final long generation, Path... loc return files; } + private String getStateFileName(long generation) { + return prefix + generation + STATE_FILE_EXTENSION; + } + /** - * Tries to load the latest state from the given data-locations. It tries to load the latest state determined by - * the states version from one or more data directories and if none of the latest states can be loaded an exception - * is thrown to prevent accidentally loading a previous state and silently omitting the latest state. + * Tries to load the state of particular generation from the given data-locations. If any of data locations contain state files with + * given generation, state will be loaded from these state files. * - * @param logger a logger instance + * @param logger a logger instance. + * @param generation the generation to be loaded. * @param dataLocations the data-locations to try. - * @return the latest state or null if no state was found. + * @return the state of asked generation or null if no state was found. */ - public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException { - long maxStateId = findMaxStateId(prefix, dataLocations); - List stateFiles = findStateFilesByGeneration(maxStateId, dataLocations); - - if (maxStateId > -1 && stateFiles.isEmpty()) { - throw new IllegalStateException("unable to find state files with state id " + maxStateId + - " returned by findMaxStateId function, in data folders [" + - Arrays.stream(dataLocations).map(Path::toAbsolutePath). - map(Object::toString).collect(Collectors.joining(", ")) + - "], concurrent writes?"); - } + public T loadGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, long generation, Path... dataLocations) { + List stateFiles = findStateFilesByGeneration(generation, dataLocations); final List exceptions = new ArrayList<>(); for (Path stateFile : stateFiles) { try { T state = read(namedXContentRegistry, stateFile); - logger.trace("state id [{}] read from [{}]", maxStateId, stateFile.getFileName()); + logger.trace("generation id [{}] read from [{}]", generation, stateFile.getFileName()); return state; } catch (Exception e) { exceptions.add(new IOException("failed to read " + stateFile.toAbsolutePath(), e)); @@ -388,6 +420,40 @@ public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegis return null; } + /** + * Tries to load the latest state from the given data-locations. + * + * @param logger a logger instance. + * @param dataLocations the data-locations to try. + * @return tuple of the latest state and generation. (-1, null) if no state is found. + */ + public Tuple loadLatestStateWithGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) + throws IOException { + long generation = findMaxGenerationId(prefix, dataLocations); + T state = loadGeneration(logger, namedXContentRegistry, generation, dataLocations); + + if (generation > -1 && state == null) { + throw new IllegalStateException("unable to find state files with generation id " + generation + + " returned by findMaxGenerationId function, in data folders [" + + Arrays.stream(dataLocations).map(Path::toAbsolutePath). + map(Object::toString).collect(Collectors.joining(", ")) + + "], concurrent writes?"); + } + return Tuple.tuple(state, generation); + } + + /** + * Tries to load the latest state from the given data-locations. + * + * @param logger a logger instance. + * @param dataLocations the data-locations to try. + * @return the latest state or null if no state was found. + */ + public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws + IOException { + return loadLatestStateWithGeneration(logger, namedXContentRegistry, dataLocations).v1(); + } + /** * Deletes all meta state directories recursively for the given data locations * @param dataLocations the data location to delete @@ -399,4 +465,8 @@ public static void deleteMetaState(Path... dataLocations) throws IOException { } IOUtils.rm(stateDirectories); } + + String getPrefix() { + return prefix; + } } diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java b/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java index 24f5fd63662d9..7c4b1cbb33ab6 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaStateService.java @@ -19,55 +19,127 @@ package org.elasticsearch.gateway; -import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Predicate; /** - * Handles writing and loading both {@link MetaData} and {@link IndexMetaData} + * Handles writing and loading {@link Manifest}, {@link MetaData} and {@link IndexMetaData} */ -public class MetaStateService extends AbstractComponent { +public class MetaStateService { + private static final Logger logger = LogManager.getLogger(MetaStateService.class); private final NodeEnvironment nodeEnv; private final NamedXContentRegistry namedXContentRegistry; + // we allow subclasses in tests to redefine formats, e.g. to inject failures + protected MetaDataStateFormat META_DATA_FORMAT = MetaData.FORMAT; + protected MetaDataStateFormat INDEX_META_DATA_FORMAT = IndexMetaData.FORMAT; + protected MetaDataStateFormat MANIFEST_FORMAT = Manifest.FORMAT; + public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) { this.nodeEnv = nodeEnv; this.namedXContentRegistry = namedXContentRegistry; } /** - * Loads the full state, which includes both the global state and all the indices - * meta state. + * Loads the full state, which includes both the global state and all the indices meta data.
+ * When loading, manifest file is consulted (represented by {@link Manifest} class), to load proper generations.
+ * If there is no manifest file on disk, this method fallbacks to BWC mode, where latest generation of global and indices + * metadata is loaded. Please note that currently there is no way to distinguish between manifest file being removed and manifest + * file was not yet created. It means that this method always fallbacks to BWC mode, if there is no manifest file. + * + * @return tuple of {@link Manifest} and {@link MetaData} with global metadata and indices metadata. If there is no state on disk, + * meta state with globalGeneration -1 and empty meta data is returned. + * @throws IOException if some IOException when loading files occurs or there is no metadata referenced by manifest file. */ - MetaData loadFullState() throws IOException { - MetaData globalMetaData = loadGlobalState(); + Tuple loadFullState() throws IOException { + final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); + if (manifest == null) { + return loadFullStateBWC(); + } + + final MetaData.Builder metaDataBuilder; + if (manifest.isGlobalGenerationMissing()) { + metaDataBuilder = MetaData.builder(); + } else { + final MetaData globalMetaData = META_DATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(), + nodeEnv.nodeDataPaths()); + if (globalMetaData != null) { + metaDataBuilder = MetaData.builder(globalMetaData); + } else { + throw new IOException("failed to find global metadata [generation: " + manifest.getGlobalGeneration() + "]"); + } + } + + for (Map.Entry entry : manifest.getIndexGenerations().entrySet()) { + final Index index = entry.getKey(); + final long generation = entry.getValue(); + final String indexFolderName = index.getUUID(); + final IndexMetaData indexMetaData = INDEX_META_DATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation, + nodeEnv.resolveIndexFolder(indexFolderName)); + if (indexMetaData != null) { + metaDataBuilder.put(indexMetaData, false); + } else { + throw new IOException("failed to find metadata for existing index " + index.getName() + " [location: " + indexFolderName + + ", generation: " + generation + "]"); + } + } + + return new Tuple<>(manifest, metaDataBuilder.build()); + } + + /** + * "Manifest-less" BWC version of loading metadata from disk. See also {@link #loadFullState()} + */ + private Tuple loadFullStateBWC() throws IOException { + Map indices = new HashMap<>(); MetaData.Builder metaDataBuilder; + + Tuple metaDataAndGeneration = + META_DATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); + MetaData globalMetaData = metaDataAndGeneration.v1(); + long globalStateGeneration = metaDataAndGeneration.v2(); + if (globalMetaData != null) { metaDataBuilder = MetaData.builder(globalMetaData); + assert Version.CURRENT.major < 8 : "failed to find manifest file, which is mandatory staring with Elasticsearch version 8.0"; } else { metaDataBuilder = MetaData.builder(); } + for (String indexFolderName : nodeEnv.availableIndexFolders()) { - IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, - nodeEnv.resolveIndexFolder(indexFolderName)); + Tuple indexMetaDataAndGeneration = + INDEX_META_DATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, + nodeEnv.resolveIndexFolder(indexFolderName)); + assert Version.CURRENT.major < 8 : "failed to find manifest file, which is mandatory staring with Elasticsearch version 8.0"; + IndexMetaData indexMetaData = indexMetaDataAndGeneration.v1(); + long generation = indexMetaDataAndGeneration.v2(); if (indexMetaData != null) { + indices.put(indexMetaData.getIndex(), generation); metaDataBuilder.put(indexMetaData, false); } else { logger.debug("[{}] failed to find metadata for existing index location", indexFolderName); } } - return metaDataBuilder.build(); + + Manifest manifest = new Manifest(globalStateGeneration, indices); + return new Tuple<>(manifest, metaDataBuilder.build()); } /** @@ -75,7 +147,7 @@ MetaData loadFullState() throws IOException { */ @Nullable public IndexMetaData loadIndexState(Index index) throws IOException { - return IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.indexPaths(index)); + return INDEX_META_DATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.indexPaths(index)); } /** @@ -85,15 +157,15 @@ List loadIndicesStates(Predicate excludeIndexPathIdsPredi List indexMetaDataList = new ArrayList<>(); for (String indexFolderName : nodeEnv.availableIndexFolders(excludeIndexPathIdsPredicate)) { assert excludeIndexPathIdsPredicate.test(indexFolderName) == false : - "unexpected folder " + indexFolderName + " which should have been excluded"; - IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, - nodeEnv.resolveIndexFolder(indexFolderName)); + "unexpected folder " + indexFolderName + " which should have been excluded"; + IndexMetaData indexMetaData = INDEX_META_DATA_FORMAT.loadLatestState(logger, namedXContentRegistry, + nodeEnv.resolveIndexFolder(indexFolderName)); if (indexMetaData != null) { final String indexPathId = indexMetaData.getIndex().getUUID(); if (indexFolderName.equals(indexPathId)) { indexMetaDataList.add(indexMetaData); } else { - throw new IllegalStateException("[" + indexFolderName+ "] invalid index folder name, rename to [" + indexPathId + "]"); + throw new IllegalStateException("[" + indexFolderName + "] invalid index folder name, rename to [" + indexPathId + "]"); } } else { logger.debug("[{}] failed to find metadata for existing index location", indexFolderName); @@ -102,42 +174,121 @@ List loadIndicesStates(Predicate excludeIndexPathIdsPredi return indexMetaDataList; } + /** + * Loads Manifest file from disk, returns Manifest.empty() if there is no manifest file. + */ + public Manifest loadManifestOrEmpty() throws IOException { + Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); + if (manifest == null) { + manifest = Manifest.empty(); + } + return manifest; + } + /** * Loads the global state, *without* index state, see {@link #loadFullState()} for that. */ MetaData loadGlobalState() throws IOException { - return MetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); + return META_DATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); } /** - * Writes the index state. + * Writes manifest file (represented by {@link Manifest}) to disk and performs cleanup of old manifest state file if + * the write succeeds or newly created manifest state if the write fails. * + * @throws WriteStateException if exception when writing state occurs. See also {@link WriteStateException#isDirty()} + */ + public long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException { + logger.trace("[_meta] writing state, reason [{}]", reason); + try { + long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths()); + logger.trace("[_meta] state written (generation: {})", generation); + return generation; + } catch (WriteStateException ex) { + throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex); + } + } + + /** + * Writes the index state. + *

* This method is public for testing purposes. + * + * @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return + * false, because new index state file is not yet referenced by manifest file. */ - public void writeIndex(String reason, IndexMetaData indexMetaData) throws IOException { + public long writeIndex(String reason, IndexMetaData indexMetaData) throws WriteStateException { final Index index = indexMetaData.getIndex(); logger.trace("[{}] writing state, reason [{}]", index, reason); try { - IndexMetaData.FORMAT.write(indexMetaData, - nodeEnv.indexPaths(indexMetaData.getIndex())); + long generation = INDEX_META_DATA_FORMAT.write(indexMetaData, + nodeEnv.indexPaths(indexMetaData.getIndex())); logger.trace("[{}] state written", index); - } catch (Exception ex) { - logger.warn(() -> new ParameterizedMessage("[{}]: failed to write index state", index), ex); - throw new IOException("failed to write state for [" + index + "]", ex); + return generation; + } catch (WriteStateException ex) { + throw new WriteStateException(false, "[" + index + "]: failed to write index state", ex); } } /** * Writes the global state, *without* the indices states. + * + * @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return + * false, because new global state file is not yet referenced by manifest file. */ - void writeGlobalState(String reason, MetaData metaData) throws IOException { - logger.trace("[_global] writing state, reason [{}]", reason); + long writeGlobalState(String reason, MetaData metaData) throws WriteStateException { + logger.trace("[_global] writing state, reason [{}]", reason); try { - MetaData.FORMAT.write(metaData, nodeEnv.nodeDataPaths()); + long generation = META_DATA_FORMAT.write(metaData, nodeEnv.nodeDataPaths()); logger.trace("[_global] state written"); - } catch (Exception ex) { - logger.warn("[_global]: failed to write global state", ex); - throw new IOException("failed to write global state", ex); + return generation; + } catch (WriteStateException ex) { + throw new WriteStateException(false, "[_global]: failed to write global state", ex); } } -} + + /** + * Removes old state files in global state directory. + * + * @param currentGeneration current state generation to keep in the directory. + */ + void cleanupGlobalState(long currentGeneration) { + META_DATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.nodeDataPaths()); + } + + /** + * Removes old state files in index directory. + * + * @param index index to perform clean up on. + * @param currentGeneration current state generation to keep in the index directory. + */ + public void cleanupIndex(Index index, long currentGeneration) { + INDEX_META_DATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.indexPaths(index)); + } + + /** + * Writes index metadata and updates manifest file accordingly. + * Used by tests. + */ + public void writeIndexAndUpdateManifest(String reason, IndexMetaData metaData) throws IOException { + long generation = writeIndex(reason, metaData); + Manifest manifest = loadManifestOrEmpty(); + Map indices = new HashMap<>(manifest.getIndexGenerations()); + indices.put(metaData.getIndex(), generation); + manifest = new Manifest(manifest.getGlobalGeneration(), indices); + writeManifestAndCleanup(reason, manifest); + cleanupIndex(metaData.getIndex(), generation); + } + + /** + * Writes global metadata and updates manifest file accordingly. + * Used by tests. + */ + public void writeGlobalStateAndUpdateManifest(String reason, MetaData metaData) throws IOException { + long generation = writeGlobalState(reason, metaData); + Manifest manifest = loadManifestOrEmpty(); + manifest = new Manifest(generation, manifest.getIndexGenerations()); + writeManifestAndCleanup(reason, manifest); + cleanupGlobalState(generation); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java index f4d1bf1b57352..24b75f5ace83c 100644 --- a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java @@ -90,7 +90,7 @@ protected NodesGatewayMetaState newResponse(Request request, List indices = new HashMap<>(state.getIndexGenerations()); + if (introduceErrors) { + switch (randomInt(3)) { + case 0: { + generation = generation + 1; + break; + } + case 1: { + indices.remove(randomFrom(indices.keySet())); + break; + } + case 2: { + Tuple indexEntry = randomIndexEntry(); + indices.put(indexEntry.v1(), indexEntry.v2()); + break; + } + case 3: { + Index index = randomFrom(indices.keySet()); + indices.compute(index, (i, g) -> g + 1); + break; + } + } + } + return new Manifest(generation, indices); + } + + private Tuple randomIndexEntry() { + final String name = randomAlphaOfLengthBetween(4, 15); + final String uuid = UUIDs.randomBase64UUID(); + final Index index = new Index(name, uuid); + final long indexGeneration = randomNonNegativeLong(); + return Tuple.tuple(index, indexGeneration); + } + + private Manifest randomManifest() { + long generation = randomNonNegativeLong(); + Map indices = new HashMap<>(); + for (int i = 0; i < randomIntBetween(1, 5); i++) { + Tuple indexEntry = randomIndexEntry(); + indices.put(indexEntry.v1(), indexEntry.v2()); + } + return new Manifest(generation, indices); + } + + public void testEqualsAndHashCode() { + checkEqualsAndHashCode(randomManifest(), org -> copyState(org, false), org -> copyState(org, true)); + } + + public void testXContent() throws IOException { + Manifest state = randomManifest(); + + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + Manifest.FORMAT.toXContent(builder, state); + builder.endObject(); + BytesReference bytes = BytesReference.bytes(builder); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, bytes)) { + assertThat(Manifest.fromXContent(parser), equalTo(state)); + } + } + + public void testEmptyManifest() { + assertTrue(Manifest.empty().isEmpty()); + assertFalse(randomManifest().isEmpty()); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index ff8393b659d14..414f37799678d 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -395,18 +395,24 @@ public void testRecoverBrokenIndexMetadata() throws Exception { .waitForNoRelocatingShards(true).waitForNodes("2")).actionGet(); } ClusterState state = client().admin().cluster().prepareState().get().getState(); - IndexMetaData metaData = state.getMetaData().index("test"); - for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) { - IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings()) + + final IndexMetaData metaData = state.getMetaData().index("test"); + final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion().id) - // this is invalid but should be archived + // this is invalid but should be archived .put("index.similarity.BM25.type", "classic") - // this one is not validated ahead of time and breaks allocation + // this one is not validated ahead of time and breaks allocation .put("index.analysis.filter.myCollator.type", "icu_collation") - ).build(); - IndexMetaData.FORMAT.write(brokenMeta, services.indexPaths(brokenMeta.getIndex())); - } - internalCluster().fullRestart(); + ).build(); + internalCluster().fullRestart(new RestartCallback(){ + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName); + metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta); + return super.onNodeStopped(nodeName); + } + }); + // ensureGreen(closedIndex) waits for the index to show up in the metadata // this is crucial otherwise the state call below might not contain the index yet ensureGreen(metaData.getIndex().getName()); @@ -457,13 +463,19 @@ public void testRecoverMissingAnalyzer() throws Exception { .waitForNoRelocatingShards(true).waitForNodes("2")).actionGet(); } ClusterState state = client().admin().cluster().prepareState().get().getState(); - IndexMetaData metaData = state.getMetaData().index("test"); - for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) { - IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings() + + final IndexMetaData metaData = state.getMetaData().index("test"); + final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings() .filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build(); - IndexMetaData.FORMAT.write(brokenMeta, services.indexPaths(brokenMeta.getIndex())); - } - internalCluster().fullRestart(); + internalCluster().fullRestart(new RestartCallback(){ + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName); + metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta); + return super.onNodeStopped(nodeName); + } + }); + // ensureGreen(closedIndex) waits for the index to show up in the metadata // this is crucial otherwise the state call below might not contain the index yet ensureGreen(metaData.getIndex().getName()); @@ -494,14 +506,20 @@ public void testArchiveBrokenClusterSettings() throws Exception { .waitForNoRelocatingShards(true).waitForNodes("2")).actionGet(); } ClusterState state = client().admin().cluster().prepareState().get().getState(); - MetaData metaData = state.getMetaData(); - for (NodeEnvironment nodeEnv : internalCluster().getInstances(NodeEnvironment.class)) { - MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder() + + final MetaData metaData = state.getMetaData(); + final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder() .put(metaData.persistentSettings()).put("this.is.unknown", true) .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), "broken").build()).build(); - MetaData.FORMAT.write(brokenMeta, nodeEnv.nodeDataPaths()); - } - internalCluster().fullRestart(); + internalCluster().fullRestart(new RestartCallback(){ + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName); + metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta); + return super.onNodeStopped(nodeName); + } + }); + ensureYellow("test"); // wait for state recovery state = client().admin().cluster().prepareState().get().getState(); assertEquals("true", state.metaData().persistentSettings().get("archived.this.is.unknown")); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java index 14f3c212c464c..901b70f7f7449 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java @@ -19,12 +19,14 @@ package org.elasticsearch.gateway; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -32,143 +34,113 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.test.TestCustomMetaData; +import org.mockito.ArgumentCaptor; +import java.io.IOException; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; -import static java.util.Collections.emptySet; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; -/** - * Test IndexMetaState for master and data only nodes return correct list of indices to write - * There are many parameters: - * - meta state is not in memory - * - meta state is in memory with old version/ new version - * - meta state is in memory with new version - * - version changed in cluster state event/ no change - * - node is data only node - * - node is master eligible - * for data only nodes: shard initializing on shard - */ public class GatewayMetaStateTests extends ESAllocationTestCase { - ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) { - //ridiculous settings to make sure we don't run into uninitialized because fo default - AllocationService strategy = createAllocationService(Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 100) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) - .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) - .build()); - ClusterState newClusterState, previousClusterState; - MetaData metaDataOldClusterState = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2)) - .build(); - - RoutingTable routingTableOldClusterState = RoutingTable.builder() - .addAsNew(metaDataOldClusterState.index("test")) - .build(); + private ClusterState noIndexClusterState(boolean masterEligible) { + MetaData metaData = MetaData.builder().build(); + RoutingTable routingTable = RoutingTable.builder().build(); - // assign all shards - ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metaData(metaDataOldClusterState) - .routingTable(routingTableOldClusterState) + return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData) + .routingTable(routingTable) .nodes(generateDiscoveryNodes(masterEligible)) .build(); - // new cluster state will have initializing shards on node 1 - RoutingTable routingTableNewClusterState = strategy.reroute(init, "reroute").routingTable(); - if (initializing == false) { - // pretend all initialized, nothing happened - ClusterState temp = ClusterState.builder(init).routingTable(routingTableNewClusterState) - .metaData(metaDataOldClusterState).build(); - routingTableNewClusterState = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)) - .routingTable(); - routingTableOldClusterState = routingTableNewClusterState; - - } else { - // nothing to do, we have one routing table with unassigned and one with initializing - } + } - // create new meta data either with version changed or not - MetaData metaDataNewClusterState = MetaData.builder() - .put(init.metaData().index("test"), versionChanged) + private ClusterState clusterStateWithUnassignedIndex(IndexMetaData indexMetaData, boolean masterEligible) { + MetaData metaData = MetaData.builder() + .put(indexMetaData, false) .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); - // create the cluster states with meta data and routing tables as computed before - previousClusterState = ClusterState.builder(init) - .metaData(metaDataOldClusterState) - .routingTable(routingTableOldClusterState) + return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData) + .routingTable(routingTable) .nodes(generateDiscoveryNodes(masterEligible)) .build(); - newClusterState = ClusterState.builder(previousClusterState).routingTable(routingTableNewClusterState) - .metaData(metaDataNewClusterState).version(previousClusterState.getVersion() + 1).build(); - - ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState); - assertThat(event.state().version(), equalTo(event.previousState().version() + 1)); - return event; } - ClusterChangedEvent generateCloseEvent(boolean masterEligible) { - //ridiculous settings to make sure we don't run into uninitialized because fo default + private ClusterState clusterStateWithAssignedIndex(IndexMetaData indexMetaData, boolean masterEligible) { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 100) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) .build()); - ClusterState newClusterState, previousClusterState; - MetaData metaDataIndexCreated = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2)) - .build(); - RoutingTable routingTableIndexCreated = RoutingTable.builder() - .addAsNew(metaDataIndexCreated.index("test")) - .build(); + ClusterState oldClusterState = clusterStateWithUnassignedIndex(indexMetaData, masterEligible); + RoutingTable routingTable = strategy.reroute(oldClusterState, "reroute").routingTable(); - // assign all shards - ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metaData(metaDataIndexCreated) - .routingTable(routingTableIndexCreated) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - RoutingTable routingTableInitializing = strategy.reroute(init, "reroute").routingTable(); - ClusterState temp = ClusterState.builder(init).routingTable(routingTableInitializing).build(); - RoutingTable routingTableStarted = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING)) - .routingTable(); - - // create new meta data either with version changed or not - MetaData metaDataStarted = MetaData.builder() - .put(init.metaData().index("test"), true) + MetaData metaDataNewClusterState = MetaData.builder() + .put(oldClusterState.metaData().index("test"), false) .build(); - // create the cluster states with meta data and routing tables as computed before - MetaData metaDataClosed = MetaData.builder() + return ClusterState.builder(oldClusterState).routingTable(routingTable) + .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); + } + + private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) { + ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible); + + MetaData metaDataNewClusterState = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE) - .numberOfShards(5).numberOfReplicas(2)).version(metaDataStarted.version() + 1) + .numberOfShards(5).numberOfReplicas(2)) + .version(oldClusterState.metaData().version() + 1) .build(); - previousClusterState = ClusterState.builder(init) - .metaData(metaDataStarted) - .routingTable(routingTableStarted) - .nodes(generateDiscoveryNodes(masterEligible)) + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaDataNewClusterState.index("test")) + .build(); + + return ClusterState.builder(oldClusterState).routingTable(routingTable) + .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); + } + + private ClusterState clusterStateWithJustOpenedIndex(IndexMetaData indexMetaData, boolean masterEligible) { + ClusterState oldClusterState = clusterStateWithClosedIndex(indexMetaData, masterEligible); + + MetaData metaDataNewClusterState = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.OPEN) + .numberOfShards(5).numberOfReplicas(2)) + .version(oldClusterState.metaData().version() + 1) .build(); - newClusterState = ClusterState.builder(previousClusterState) - .routingTable(routingTableIndexCreated) - .metaData(metaDataClosed) - .version(previousClusterState.getVersion() + 1).build(); - ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState); - assertThat(event.state().version(), equalTo(event.previousState().version() + 1)); - return event; + return ClusterState.builder(oldClusterState) + .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); } private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) { @@ -177,80 +149,278 @@ private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) { .add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node"); } - public void assertState(ClusterChangedEvent event, - boolean stateInMemory, - boolean expectMetaData) throws Exception { - MetaData inMemoryMetaData = null; - Set oldIndicesList = emptySet(); - if (stateInMemory) { - inMemoryMetaData = event.previousState().metaData(); - oldIndicesList = GatewayMetaState.getRelevantIndices(event.previousState(), event.previousState(), oldIndicesList); - } - Set newIndicesList = GatewayMetaState.getRelevantIndices(event.state(),event.previousState(), oldIndicesList); - // third, get the actual write info - Iterator indices = GatewayMetaState.resolveStatesToBeWritten(oldIndicesList, newIndicesList, - inMemoryMetaData, event.state().metaData()).iterator(); - - if (expectMetaData) { - assertThat(indices.hasNext(), equalTo(true)); - assertThat(indices.next().getNewMetaData().getIndex().getName(), equalTo("test")); - assertThat(indices.hasNext(), equalTo(false)); + private Set randomPrevWrittenIndices(IndexMetaData indexMetaData) { + if (randomBoolean()) { + return Collections.singleton(indexMetaData.getIndex()); } else { - assertThat(indices.hasNext(), equalTo(false)); + return Collections.emptySet(); } } - public void testVersionChangeIsAlwaysWritten() throws Exception { - // test that version changes are always written - boolean initializing = randomBoolean(); - boolean versionChanged = true; - boolean stateInMemory = randomBoolean(); - boolean masterEligible = randomBoolean(); - boolean expectMetaData = true; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - assertState(event, stateInMemory, expectMetaData); - } - - public void testNewShardsAlwaysWritten() throws Exception { - // make sure new shards on data only node always written - boolean initializing = true; - boolean versionChanged = randomBoolean(); - boolean stateInMemory = randomBoolean(); - boolean masterEligible = false; - boolean expectMetaData = true; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - assertState(event, stateInMemory, expectMetaData); - } - - public void testAllUpToDateNothingWritten() throws Exception { - // make sure state is not written again if we wrote already - boolean initializing = false; - boolean versionChanged = false; - boolean stateInMemory = true; - boolean masterEligible = randomBoolean(); - boolean expectMetaData = false; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - assertState(event, stateInMemory, expectMetaData); + private IndexMetaData createIndexMetaData(String name) { + return IndexMetaData.builder(name). + settings(settings(Version.CURRENT)). + numberOfShards(5). + numberOfReplicas(2). + build(); } - public void testNoWriteIfNothingChanged() throws Exception { - boolean initializing = false; - boolean versionChanged = false; - boolean stateInMemory = true; - boolean masterEligible = randomBoolean(); - boolean expectMetaData = false; - ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); - ClusterChangedEvent newEventWithNothingChanged = new ClusterChangedEvent("test cluster state", event.state(), event.state()); - assertState(newEventWithNothingChanged, stateInMemory, expectMetaData); + public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = GatewayMetaState.getRelevantIndices( + clusterStateWithUnassignedIndex(indexMetaData, true), + noIndexClusterState(true), + randomPrevWrittenIndices(indexMetaData)); + assertThat(indices.size(), equalTo(1)); } - public void testWriteClosedIndex() throws Exception { - // test that the closing of an index is written also on data only node + public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = GatewayMetaState.getRelevantIndices( + clusterStateWithUnassignedIndex(indexMetaData, false), + noIndexClusterState(false), + randomPrevWrittenIndices(indexMetaData)); + assertThat(indices.size(), equalTo(0)); + } + + public void testGetRelevantIndicesWithAssignedShards() { + IndexMetaData indexMetaData = createIndexMetaData("test"); boolean masterEligible = randomBoolean(); - boolean expectMetaData = true; - boolean stateInMemory = true; - ClusterChangedEvent event = generateCloseEvent(masterEligible); - assertState(event, stateInMemory, expectMetaData); + Set indices = GatewayMetaState.getRelevantIndices( + clusterStateWithAssignedIndex(indexMetaData, masterEligible), + clusterStateWithUnassignedIndex(indexMetaData, masterEligible), + randomPrevWrittenIndices(indexMetaData)); + assertThat(indices.size(), equalTo(1)); + } + + public void testGetRelevantIndicesForClosedPrevWrittenIndexOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = GatewayMetaState.getRelevantIndices( + clusterStateWithClosedIndex(indexMetaData, false), + clusterStateWithAssignedIndex(indexMetaData, false), + Collections.singleton(indexMetaData.getIndex())); + assertThat(indices.size(), equalTo(1)); + } + + public void testGetRelevantIndicesForClosedPrevNotWrittenIndexOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = GatewayMetaState.getRelevantIndices( + clusterStateWithJustOpenedIndex(indexMetaData, false), + clusterStateWithClosedIndex(indexMetaData, false), + Collections.emptySet()); + assertThat(indices.size(), equalTo(0)); + } + + public void testGetRelevantIndicesForWasClosedPrevWrittenIndexOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = GatewayMetaState.getRelevantIndices( + clusterStateWithJustOpenedIndex(indexMetaData, false), + clusterStateWithClosedIndex(indexMetaData, false), + Collections.singleton(indexMetaData.getIndex())); + assertThat(indices.size(), equalTo(1)); + } + + public void testResolveStatesToBeWritten() throws WriteStateException { + Map indices = new HashMap<>(); + Set relevantIndices = new HashSet<>(); + + IndexMetaData removedIndex = createIndexMetaData("removed_index"); + indices.put(removedIndex.getIndex(), 1L); + + IndexMetaData versionChangedIndex = createIndexMetaData("version_changed_index"); + indices.put(versionChangedIndex.getIndex(), 2L); + relevantIndices.add(versionChangedIndex.getIndex()); + + IndexMetaData notChangedIndex = createIndexMetaData("not_changed_index"); + indices.put(notChangedIndex.getIndex(), 3L); + relevantIndices.add(notChangedIndex.getIndex()); + + IndexMetaData newIndex = createIndexMetaData("new_index"); + relevantIndices.add(newIndex.getIndex()); + + MetaData oldMetaData = MetaData.builder() + .put(removedIndex, false) + .put(versionChangedIndex, false) + .put(notChangedIndex, false) + .build(); + + MetaData newMetaData = MetaData.builder() + .put(versionChangedIndex, true) + .put(notChangedIndex, false) + .put(newIndex, false) + .build(); + + IndexMetaData newVersionChangedIndex = newMetaData.index(versionChangedIndex.getIndex()); + + List actions = + GatewayMetaState.resolveIndexMetaDataActions(indices, relevantIndices, oldMetaData, newMetaData); + + assertThat(actions, hasSize(3)); + + for (GatewayMetaState.IndexMetaDataAction action : actions) { + if (action instanceof GatewayMetaState.KeepPreviousGeneration) { + assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex())); + GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class); + assertThat(action.execute(writer), equalTo(3L)); + verifyZeroInteractions(writer); + } + if (action instanceof GatewayMetaState.WriteNewIndexMetaData) { + assertThat(action.getIndex(), equalTo(newIndex.getIndex())); + GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class); + when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L); + assertThat(action.execute(writer), equalTo(0L)); + } + if (action instanceof GatewayMetaState.WriteChangedIndexMetaData) { + assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex())); + GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class); + when(writer.writeIndex(anyString(), eq(newVersionChangedIndex))).thenReturn(3L); + assertThat(action.execute(writer), equalTo(3L)); + ArgumentCaptor reason = ArgumentCaptor.forClass(String.class); + verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex)); + assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion()))); + assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion()))); + } + } + } + + private static class MetaStateServiceWithFailures extends MetaStateService { + private final int invertedFailRate; + private boolean failRandomly; + + private MetaDataStateFormat wrap(MetaDataStateFormat format) { + return new MetaDataStateFormat(format.getPrefix()) { + @Override + public void toXContent(XContentBuilder builder, T state) throws IOException { + format.toXContent(builder, state); + } + + @Override + public T fromXContent(XContentParser parser) throws IOException { + return format.fromXContent(parser); + } + + @Override + protected Directory newDirectory(Path dir) { + MockDirectoryWrapper mock = newMockFSDirectory(dir); + if (failRandomly) { + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + int r = randomIntBetween(0, invertedFailRate); + if (r == 0) { + throw new MockDirectoryWrapper.FakeIOException(); + } + } + }; + mock.failOn(fail); + } + closeAfterSuite(mock); + return mock; + } + }; + } + + MetaStateServiceWithFailures(int invertedFailRate, NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) { + super(nodeEnv, namedXContentRegistry); + META_DATA_FORMAT = wrap(MetaData.FORMAT); + INDEX_META_DATA_FORMAT = wrap(IndexMetaData.FORMAT); + MANIFEST_FORMAT = wrap(Manifest.FORMAT); + failRandomly = false; + this.invertedFailRate = invertedFailRate; + } + + void failRandomly() { + failRandomly = true; + } + + void noFailures() { + failRandomly = false; + } + } + + private boolean metaDataEquals(MetaData md1, MetaData md2) { + boolean equals = MetaData.isGlobalStateEquals(md1, md2); + + for (IndexMetaData imd : md1) { + IndexMetaData imd2 = md2.index(imd.getIndex()); + equals = equals && imd.equals(imd2); + } + + for (IndexMetaData imd : md2) { + IndexMetaData imd2 = md1.index(imd.getIndex()); + equals = equals && imd.equals(imd2); + } + return equals; + } + + private static MetaData randomMetaDataForTx() { + int settingNo = randomIntBetween(0, 10); + MetaData.Builder builder = MetaData.builder() + .persistentSettings(Settings.builder().put("setting" + settingNo, randomAlphaOfLength(5)).build()); + int numOfIndices = randomIntBetween(0, 3); + + for (int i = 0; i < numOfIndices; i++) { + int indexNo = randomIntBetween(0, 50); + IndexMetaData indexMetaData = IndexMetaData.builder("index" + indexNo).settings( + Settings.builder() + .put(IndexMetaData.SETTING_INDEX_UUID, "index" + indexNo) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .build() + ).build(); + builder.put(indexMetaData, false); + } + return builder.build(); + } + + public void testAtomicityWithFailures() throws IOException { + try (NodeEnvironment env = newNodeEnvironment()) { + MetaStateServiceWithFailures metaStateService = + new MetaStateServiceWithFailures(randomIntBetween(100, 1000), env, xContentRegistry()); + + // We only guarantee atomicity of writes, if there is initial Manifest file + Manifest manifest = Manifest.empty(); + MetaData metaData = MetaData.EMPTY_META_DATA; + metaStateService.writeManifestAndCleanup("startup", Manifest.empty()); + + metaStateService.failRandomly(); + Set possibleMetaData = new HashSet<>(); + possibleMetaData.add(metaData); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + GatewayMetaState.AtomicClusterStateWriter writer = + new GatewayMetaState.AtomicClusterStateWriter(metaStateService, manifest); + metaData = randomMetaDataForTx(); + Map indexGenerations = new HashMap<>(); + + try { + long globalGeneration = writer.writeGlobalState("global", metaData); + + for (IndexMetaData indexMetaData : metaData) { + long generation = writer.writeIndex("index", indexMetaData); + indexGenerations.put(indexMetaData.getIndex(), generation); + } + + Manifest newManifest = new Manifest(globalGeneration, indexGenerations); + writer.writeManifestAndCleanup("manifest", newManifest); + possibleMetaData.clear(); + possibleMetaData.add(metaData); + manifest = newManifest; + } catch (WriteStateException e) { + if (e.isDirty()) { + possibleMetaData.add(metaData); + } + } + } + + metaStateService.noFailures(); + + Tuple manifestAndMetaData = metaStateService.loadFullState(); + MetaData loadedMetaData = manifestAndMetaData.v2(); + + assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData))); + } } public void testAddCustomMetaDataOnUpgrade() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index 92057723a3a58..a7f24cdba3a52 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.DirectoryStream; @@ -71,11 +70,11 @@ public class MetaDataStateFormatTests extends ESTestCase { /** * Ensure we can read a pre-generated cluster state. */ - public void testReadClusterState() throws URISyntaxException, IOException { + public void testReadClusterState() throws IOException { final MetaDataStateFormat format = new MetaDataStateFormat("global-") { @Override - public void toXContent(XContentBuilder builder, MetaData state) throws IOException { + public void toXContent(XContentBuilder builder, MetaData state) { fail("this test doesn't write"); } @@ -104,7 +103,7 @@ public void testReadWriteState() throws IOException { Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); - format.write(state, dirs); + format.writeAndCleanup(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); assertEquals(list.length, 1); @@ -119,7 +118,7 @@ public void testReadWriteState() throws IOException { } DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); - format.write(state2, dirs); + format.writeAndCleanup(state2, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -146,7 +145,7 @@ public void testVersionMismatch() throws IOException { Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); - format.write(state, dirs); + format.writeAndCleanup(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); assertEquals(list.length, 1); @@ -170,7 +169,7 @@ public void testCorruption() throws IOException { Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); - format.write(state, dirs); + format.writeAndCleanup(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); assertEquals(list.length, 1); @@ -193,8 +192,7 @@ public void testCorruption() throws IOException { } } - public static void corruptFile(Path file, Logger logger) throws IOException { - Path fileToCorrupt = file; + public static void corruptFile(Path fileToCorrupt, Logger logger) throws IOException { try (SimpleFSDirectory dir = new SimpleFSDirectory(fileToCorrupt.getParent())) { long checksumBeforeCorruption; try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) { @@ -248,7 +246,7 @@ public void testLoadState() throws IOException { dirs[i] = createTempDir(); Files.createDirectories(dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME)); for (int j = 0; j < numStates; j++) { - format.write(meta.get(j), dirs[i]); + format.writeAndCleanup(meta.get(j), dirs[i]); if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily // need here.... Path file = dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME).resolve("global-" + j + ".st"); @@ -299,7 +297,7 @@ private DummyState writeAndReadStateSuccessfully(Format format, Path... paths) t format.noFailures(); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - format.write(state, paths); + format.writeAndCleanup(state, paths); assertEquals(state, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); ensureOnlyOneStateFile(paths); return state; @@ -324,7 +322,7 @@ public void testFailWriteAndReadPreviousState() throws IOException { Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.writeAndCleanup(newState, path)); assertFalse(ex.isDirty()); format.noFailures(); @@ -347,7 +345,7 @@ public void testFailWriteAndReadAnyState() throws IOException { DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); possibleStates.add(newState); - WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.writeAndCleanup(newState, path)); assertTrue(ex.isDirty()); format.noFailures(); @@ -370,7 +368,7 @@ public void testFailCopyTmpFileToExtraLocation() throws IOException { format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, paths)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.writeAndCleanup(newState, paths)); assertFalse(ex.isDirty()); format.noFailures(); @@ -396,7 +394,7 @@ public void testFailRandomlyAndReadAnyState() throws IOException { DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); try { - format.write(newState, paths); + format.writeAndCleanup(newState, paths); possibleStates.clear(); possibleStates.add(newState); } catch (WriteStateException e) { @@ -481,7 +479,6 @@ private enum FailureMode { this.failureMode = FailureMode.NO_FAILURES; } - @Override public void toXContent(XContentBuilder builder, DummyState state) throws IOException { state.toXContent(builder, null); @@ -492,7 +489,6 @@ public DummyState fromXContent(XContentParser parser) throws IOException { return new DummyState().parse(parser); } - public void noFailures() { this.failureMode = FailureMode.NO_FAILURES; } diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java index 81d1442727de7..1820c904b84f0 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java @@ -191,7 +191,7 @@ private boolean indexDirectoryExists(String nodeName, Index index) { private ImmutableOpenMap getIndicesMetaDataOnNode(String nodeName) throws Exception { GatewayMetaState nodeMetaState = ((InternalTestCluster) cluster()).getInstance(GatewayMetaState.class, nodeName); - MetaData nodeMetaData = nodeMetaState.loadMetaState(); + MetaData nodeMetaData = nodeMetaState.loadMetaData(); return nodeMetaData.getIndices(); } } diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java index 938c28fe855c9..28dd2f8ba1759 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaStateServiceTests.java @@ -20,84 +20,184 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; +import java.util.HashMap; + import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.nullValue; public class MetaStateServiceTests extends ESTestCase { - private static Settings indexSettings = Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .build(); - public void testWriteLoadIndex() throws Exception { - try (NodeEnvironment env = newNodeEnvironment()) { - MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); + private NodeEnvironment env; + private MetaStateService metaStateService; + + @Override + public void setUp() throws Exception { + super.setUp(); + env = newNodeEnvironment(); + metaStateService = new MetaStateService(env, xContentRegistry()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + env.close(); + } + + private static IndexMetaData indexMetaData(String name) { + return IndexMetaData.builder(name).settings( + Settings.builder() + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .build() + ).build(); + } - IndexMetaData index = IndexMetaData.builder("test1").settings(indexSettings).build(); - metaStateService.writeIndex("test_write", index); - assertThat(metaStateService.loadIndexState(index.getIndex()), equalTo(index)); - } + public void testWriteLoadIndex() throws Exception { + IndexMetaData index = indexMetaData("test1"); + metaStateService.writeIndex("test_write", index); + assertThat(metaStateService.loadIndexState(index.getIndex()), equalTo(index)); } public void testLoadMissingIndex() throws Exception { - try (NodeEnvironment env = newNodeEnvironment()) { - MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); - assertThat(metaStateService.loadIndexState(new Index("test1", "test1UUID")), nullValue()); - } + assertThat(metaStateService.loadIndexState(new Index("test1", "test1UUID")), nullValue()); } public void testWriteLoadGlobal() throws Exception { - try (NodeEnvironment env = newNodeEnvironment()) { - MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); - - MetaData metaData = MetaData.builder() - .persistentSettings(Settings.builder().put("test1", "value1").build()) - .build(); - metaStateService.writeGlobalState("test_write", metaData); - assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings())); - } + MetaData metaData = MetaData.builder() + .persistentSettings(Settings.builder().put("test1", "value1").build()) + .build(); + metaStateService.writeGlobalState("test_write", metaData); + assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings())); } public void testWriteGlobalStateWithIndexAndNoIndexIsLoaded() throws Exception { - try (NodeEnvironment env = newNodeEnvironment()) { - MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); - - MetaData metaData = MetaData.builder() - .persistentSettings(Settings.builder().put("test1", "value1").build()) - .build(); - IndexMetaData index = IndexMetaData.builder("test1").settings(indexSettings).build(); - MetaData metaDataWithIndex = MetaData.builder(metaData).put(index, true).build(); - - metaStateService.writeGlobalState("test_write", metaDataWithIndex); - assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings())); - assertThat(metaStateService.loadGlobalState().hasIndex("test1"), equalTo(false)); - } + MetaData metaData = MetaData.builder() + .persistentSettings(Settings.builder().put("test1", "value1").build()) + .build(); + IndexMetaData index = indexMetaData("test1"); + MetaData metaDataWithIndex = MetaData.builder(metaData).put(index, true).build(); + + metaStateService.writeGlobalState("test_write", metaDataWithIndex); + assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings())); + assertThat(metaStateService.loadGlobalState().hasIndex("test1"), equalTo(false)); + } + + public void testLoadFullStateBWC() throws Exception { + IndexMetaData indexMetaData = indexMetaData("test1"); + MetaData metaData = MetaData.builder() + .persistentSettings(Settings.builder().put("test1", "value1").build()) + .put(indexMetaData, true) + .build(); + + long globalGeneration = metaStateService.writeGlobalState("test_write", metaData); + long indexGeneration = metaStateService.writeIndex("test_write", indexMetaData); + + Tuple manifestAndMetaData = metaStateService.loadFullState(); + Manifest manifest = manifestAndMetaData.v1(); + assertThat(manifest.getGlobalGeneration(), equalTo(globalGeneration)); + assertThat(manifest.getIndexGenerations(), hasKey(indexMetaData.getIndex())); + assertThat(manifest.getIndexGenerations().get(indexMetaData.getIndex()), equalTo(indexGeneration)); + + MetaData loadedMetaData = manifestAndMetaData.v2(); + assertThat(loadedMetaData.persistentSettings(), equalTo(metaData.persistentSettings())); + assertThat(loadedMetaData.hasIndex("test1"), equalTo(true)); + assertThat(loadedMetaData.index("test1"), equalTo(indexMetaData)); + } + + public void testLoadEmptyStateNoManifest() throws IOException { + Tuple manifestAndMetaData = metaStateService.loadFullState(); + + Manifest manifest = manifestAndMetaData.v1(); + assertTrue(manifest.isEmpty()); + + MetaData metaData = manifestAndMetaData.v2(); + assertTrue(MetaData.isGlobalStateEquals(metaData, MetaData.EMPTY_META_DATA)); + } + + public void testLoadEmptyStateWithManifest() throws IOException { + Manifest manifest = Manifest.empty(); + metaStateService.writeManifestAndCleanup("test", manifest); + + Tuple manifestAndMetaData = metaStateService.loadFullState(); + assertTrue(manifestAndMetaData.v1().isEmpty()); + MetaData metaData = manifestAndMetaData.v2(); + assertTrue(MetaData.isGlobalStateEquals(metaData, MetaData.EMPTY_META_DATA)); + } + + public void testLoadFullStateMissingGlobalMetaData() throws IOException { + IndexMetaData index = indexMetaData("test1"); + long indexGeneration = metaStateService.writeIndex("test", index); + Manifest manifest = new Manifest(Manifest.empty().getGlobalGeneration(), new HashMap() {{ + put(index.getIndex(), indexGeneration); + }}); + assertTrue(manifest.isGlobalGenerationMissing()); + metaStateService.writeManifestAndCleanup("test", manifest); + + Tuple manifestAndMetaData = metaStateService.loadFullState(); + assertThat(manifestAndMetaData.v1(), equalTo(manifest)); + MetaData loadedMetaData = manifestAndMetaData.v2(); + assertTrue(MetaData.isGlobalStateEquals(loadedMetaData, MetaData.EMPTY_META_DATA)); + assertThat(loadedMetaData.hasIndex("test1"), equalTo(true)); + assertThat(loadedMetaData.index("test1"), equalTo(index)); } - public void testLoadGlobal() throws Exception { - try (NodeEnvironment env = newNodeEnvironment()) { - MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); + public void testLoadFullStateAndUpdate() throws IOException { + IndexMetaData index = indexMetaData("test1"); + MetaData metaData = MetaData.builder() + .persistentSettings(Settings.builder().put("test1", "value1").build()) + .put(index, true) + .build(); + + long globalGeneration = metaStateService.writeGlobalState("first global state write", metaData); + long indexGeneration = metaStateService.writeIndex("first index state write", index); + + Manifest manifest = new Manifest(globalGeneration, new HashMap() {{ + put(index.getIndex(), indexGeneration); + }}); + + metaStateService.writeManifestAndCleanup("first manifest write", manifest); + + MetaData newMetaData = MetaData.builder() + .persistentSettings(Settings.builder().put("test1", "value2").build()) + .put(index, true) + .build(); + globalGeneration = metaStateService.writeGlobalState("second global state write", newMetaData); + + Tuple manifestAndMetaData = metaStateService.loadFullState(); + assertThat(manifestAndMetaData.v1(), equalTo(manifest)); + + MetaData loadedMetaData = manifestAndMetaData.v2(); + assertThat(loadedMetaData.persistentSettings(), equalTo(metaData.persistentSettings())); + assertThat(loadedMetaData.hasIndex("test1"), equalTo(true)); + assertThat(loadedMetaData.index("test1"), equalTo(index)); + + manifest = new Manifest(globalGeneration, new HashMap() {{ + put(index.getIndex(), indexGeneration); + }}); - IndexMetaData index = IndexMetaData.builder("test1").settings(indexSettings).build(); - MetaData metaData = MetaData.builder() - .persistentSettings(Settings.builder().put("test1", "value1").build()) - .put(index, true) - .build(); + metaStateService.writeManifestAndCleanup("second manifest write", manifest); + metaStateService.cleanupGlobalState(globalGeneration); + metaStateService.cleanupIndex(index.getIndex(), indexGeneration); - metaStateService.writeGlobalState("test_write", metaData); - metaStateService.writeIndex("test_write", index); + manifestAndMetaData = metaStateService.loadFullState(); + assertThat(manifestAndMetaData.v1(), equalTo(manifest)); - MetaData loadedState = metaStateService.loadFullState(); - assertThat(loadedState.persistentSettings(), equalTo(metaData.persistentSettings())); - assertThat(loadedState.hasIndex("test1"), equalTo(true)); - assertThat(loadedState.index("test1"), equalTo(index)); - } + loadedMetaData = manifestAndMetaData.v2(); + assertThat(loadedMetaData.persistentSettings(), equalTo(newMetaData.persistentSettings())); + assertThat(loadedMetaData.hasIndex("test1"), equalTo(true)); + assertThat(loadedMetaData.index("test1"), equalTo(index)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 23380f9c171f8..82c130be60e1c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -194,7 +194,7 @@ public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws public static void write(ShardStateMetaData shardStateMetaData, Path... shardPaths) throws IOException { - ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); + ShardStateMetaData.FORMAT.writeAndCleanup(shardStateMetaData, shardPaths); } public static Engine getEngineFromShard(IndexShard shard) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 6e34bb03860c5..93e0515c1cde7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -401,7 +401,7 @@ private void writeIndexState() throws IOException { // create _state of IndexMetaData try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment)) { final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex()); - IndexMetaData.FORMAT.write(indexMetaData, paths); + IndexMetaData.FORMAT.writeAndCleanup(indexMetaData, paths); logger.info("--> index metadata persisted to {} ", Arrays.toString(paths)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java index fda2f8ef7d039..dd247d8d7fc97 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java @@ -43,7 +43,8 @@ public void testLoadShardPath() throws IOException { ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); + ShardStateMetaData.FORMAT.writeAndCleanup( + new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)); assertEquals(path, shardPath.getDataPath()); assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID()); @@ -62,7 +63,8 @@ public void testFailLoadShardPathOnMultiState() throws IOException { ShardId shardId = new ShardId("foo", indexUUID, 0); Path[] paths = env.availableShardPaths(shardId); assumeTrue("This test tests multi data.path but we only got one", paths.length > 1); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); + ShardStateMetaData.FORMAT.writeAndCleanup( + new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); assertThat(e.getMessage(), containsString("more than one shard state found")); @@ -77,7 +79,8 @@ public void testFailLoadShardPathIndexUUIDMissmatch() throws IOException { ShardId shardId = new ShardId("foo", "foobar", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); + ShardStateMetaData.FORMAT.writeAndCleanup( + new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); assertThat(e.getMessage(), containsString("expected: foobar on shard path")); @@ -124,7 +127,8 @@ public void testGetRootPaths() throws IOException { ShardId shardId = new ShardId("foo", indexUUID, 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); + ShardStateMetaData.FORMAT.writeAndCleanup( + new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), indexSettings, nodeSettings)); boolean found = false; diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index cfac866895f0e..60aaf76b27e02 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -232,12 +232,12 @@ public void testDeleteIndexStore() throws Exception { } GatewayMetaState gwMetaState = getInstanceFromNode(GatewayMetaState.class); - MetaData meta = gwMetaState.loadMetaState(); + MetaData meta = gwMetaState.loadMetaData(); assertNotNull(meta); assertNotNull(meta.index("test")); assertAcked(client().admin().indices().prepareDelete("test")); - meta = gwMetaState.loadMetaState(); + meta = gwMetaState.loadMetaData(); assertNotNull(meta); assertNull(meta.index("test"));