diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 4ff7204ebcea0..c64b075d01fb0 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -16,6 +16,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; +import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexWriter; @@ -46,6 +47,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.common.logging.Loggers; @@ -70,6 +72,8 @@ import java.io.Closeable; import java.io.IOError; import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -290,17 +294,46 @@ public static void overrideVersion(Version newVersion, Path... dataPaths) throws * Loads the available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found. */ public OnDiskState loadOnDiskState() throws IOException { + return loadOnDiskState(true); + } + + /** + * Loads the available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found. + * @param checkClean whether to check the index for corruption before loading, only for tests + */ + OnDiskState loadOnDiskState(boolean checkClean) throws IOException { OnDiskState onDiskState = OnDiskState.NO_ON_DISK_STATE; final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME); if (Files.exists(indexPath)) { - try (Directory directory = createDirectory(indexPath); - DirectoryReader directoryReader = DirectoryReader.open(directory)) { - onDiskState = loadOnDiskState(dataPath, directoryReader); + try (Directory directory = createDirectory(indexPath)) { + if (checkClean) { + try (BytesStreamOutput outputStream = new BytesStreamOutput()) { + final boolean isClean; + try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8); + CheckIndex checkIndex = new CheckIndex(directory)) { + checkIndex.setInfoStream(printStream); + checkIndex.setChecksumsOnly(true); + isClean = checkIndex.checkIndex().clean; + } - if (nodeId.equals(onDiskState.nodeId) == false) { - throw new IllegalStateException("unexpected node ID in metadata, found [" + onDiskState.nodeId + - "] in [" + dataPath + "] but expected [" + nodeId + "]"); + if (isClean == false) { + if (logger.isErrorEnabled()) { + outputStream.bytes().utf8ToString().lines().forEach(l -> logger.error("checkIndex: {}", l)); + } + throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath + + "] has been changed by an external force after it was last written by Elasticsearch and is now unreadable"); + } + } + } + + try (DirectoryReader directoryReader = DirectoryReader.open(directory)) { + onDiskState = loadOnDiskState(dataPath, directoryReader); + + if (nodeId.equals(onDiskState.nodeId) == false) { + throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath + + "] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]"); + } } } catch (IndexNotFoundException e) { logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 04d6256fb1ed4..e030d5d1f6dee 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -370,7 +370,7 @@ public void testDataOnlyNodePersistence() throws Exception { assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(), not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration()))); CoordinationMetadata persistedCoordinationMetadata = - persistedClusterStateService.loadOnDiskState().metadata.coordinationMetadata(); + persistedClusterStateService.loadOnDiskState(false).metadata.coordinationMetadata(); assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(), equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration)); assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(), @@ -386,12 +386,12 @@ public void testDataOnlyNodePersistence() throws Exception { .clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build(); assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState()); - persistedCoordinationMetadata = persistedClusterStateService.loadOnDiskState().metadata.coordinationMetadata(); + persistedCoordinationMetadata = persistedClusterStateService.loadOnDiskState(false).metadata.coordinationMetadata(); assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(), equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration)); assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(), equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration)); - assertTrue(persistedClusterStateService.loadOnDiskState().metadata.clusterUUIDCommitted()); + assertTrue(persistedClusterStateService.loadOnDiskState(false).metadata.clusterUUIDCommitted()); // generate a series of updates and check if batching works final String indexName = randomAlphaOfLength(10); diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 80c0ea900e999..f90ad2faf7ae8 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; +import org.apache.lucene.mockfile.ExtrasFS; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; @@ -38,24 +39,32 @@ import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOError; import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static org.apache.lucene.index.IndexWriter.WRITE_LOCK_NAME; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; public class PersistedClusterStateServiceTests extends ESTestCase { @@ -73,7 +82,7 @@ public void testPersistsAndReloadsTerm() throws IOException { assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(0L)); try (Writer writer = persistedClusterStateService.createWriter()) { writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE); - assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(newTerm)); + assertThat(persistedClusterStateService.loadOnDiskState(false).currentTerm, equalTo(newTerm)); } assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(newTerm)); @@ -638,6 +647,30 @@ public void testSlowLogging() throws IOException, IllegalAccessException { } } + public void testFailsIfCorrupt() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createTempDir())) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE); + } + + try (DirectoryStream directoryStream = Files.newDirectoryStream(nodeEnvironment.nodeDataPath().resolve("_state"))) { + CorruptionUtils.corruptFile(random(), randomFrom(StreamSupport + .stream(directoryStream.spliterator(), false) + .filter(p -> { + final String filename = p.getFileName().toString(); + return ExtrasFS.isExtra(filename) == false && filename.equals(WRITE_LOCK_NAME) == false; + }) + .collect(Collectors.toList()))); + } + + assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadOnDiskState).getMessage(), allOf( + startsWith("the index containing the cluster metadata under the data path ["), + endsWith("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable"))); + } + } + private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState, PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, IOException { @@ -675,7 +708,7 @@ private NodeEnvironment newNodeEnvironment(Path dataPath) throws IOException { } private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException { - final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadOnDiskState(); + final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadOnDiskState(false); return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata); }