Skip to content

Commit 8e3399d

Browse files
authored
Run CheckIndex on metadata index before loading (#73239) (#74173)
The metadata index is small and important and only read at startup. Today we rely on Lucene to spot if any of its components is corrupt, but Lucene does not necesssarily verify all checksums in order to catch a corruption. With this commit we run `CheckIndex` on the metadata index first, and fail on startup if a corruption is detected. Closes #29358
1 parent d4fceb5 commit 8e3399d

File tree

3 files changed

+106
-34
lines changed

3 files changed

+106
-34
lines changed

server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.lucene.document.Field;
1717
import org.apache.lucene.document.StoredField;
1818
import org.apache.lucene.document.StringField;
19+
import org.apache.lucene.index.CheckIndex;
1920
import org.apache.lucene.index.DirectoryReader;
2021
import org.apache.lucene.index.IndexNotFoundException;
2122
import org.apache.lucene.index.IndexWriter;
@@ -46,6 +47,7 @@
4647
import org.elasticsearch.common.bytes.BytesReference;
4748
import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
4849
import org.elasticsearch.common.io.Streams;
50+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
4951
import org.elasticsearch.core.Releasable;
5052
import org.elasticsearch.core.Releasables;
5153
import org.elasticsearch.common.logging.Loggers;
@@ -70,6 +72,8 @@
7072
import java.io.Closeable;
7173
import java.io.IOError;
7274
import java.io.IOException;
75+
import java.io.PrintStream;
76+
import java.nio.charset.StandardCharsets;
7377
import java.nio.file.Files;
7478
import java.nio.file.Path;
7579
import java.util.ArrayList;
@@ -306,6 +310,14 @@ public static void overrideVersion(Version newVersion, Path... dataPaths) throws
306310
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
307311
*/
308312
public OnDiskState loadBestOnDiskState() throws IOException {
313+
return loadBestOnDiskState(true);
314+
}
315+
316+
/**
317+
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
318+
* @param checkClean whether to check the index for corruption before loading, only for tests
319+
*/
320+
OnDiskState loadBestOnDiskState(boolean checkClean) throws IOException {
309321
String committedClusterUuid = null;
310322
Path committedClusterUuidPath = null;
311323
OnDiskState bestOnDiskState = OnDiskState.NO_ON_DISK_STATE;
@@ -317,39 +329,63 @@ public OnDiskState loadBestOnDiskState() throws IOException {
317329
for (final Path dataPath : dataPaths) {
318330
final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
319331
if (Files.exists(indexPath)) {
320-
try (Directory directory = createDirectory(indexPath);
321-
DirectoryReader directoryReader = DirectoryReader.open(directory)) {
322-
final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader);
332+
try (Directory directory = createDirectory(indexPath)) {
333+
if (checkClean) {
334+
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
335+
final boolean isClean;
336+
try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8.name());
337+
CheckIndex checkIndex = new CheckIndex(directory)) {
338+
checkIndex.setInfoStream(printStream);
339+
checkIndex.setChecksumsOnly(true);
340+
isClean = checkIndex.checkIndex().clean;
341+
}
323342

324-
if (nodeId.equals(onDiskState.nodeId) == false) {
325-
throw new IllegalStateException("unexpected node ID in metadata, found [" + onDiskState.nodeId +
326-
"] in [" + dataPath + "] but expected [" + nodeId + "]");
343+
if (isClean == false) {
344+
if (logger.isErrorEnabled()) {
345+
for (final String line : outputStream.bytes().utf8ToString().split("\\r?\\n")) {
346+
logger.error("checkIndex: {}", line);
347+
}
348+
}
349+
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
350+
"] has been changed by an external force after it was last written by Elasticsearch and is now unreadable");
351+
}
327352
}
353+
}
328354

329-
if (onDiskState.metadata.clusterUUIDCommitted()) {
330-
if (committedClusterUuid == null) {
331-
committedClusterUuid = onDiskState.metadata.clusterUUID();
332-
committedClusterUuidPath = dataPath;
333-
} else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) {
334-
throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid +
335-
"] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in ["
336-
+ dataPath + "]");
355+
356+
try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
357+
final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader);
358+
359+
if (nodeId.equals(onDiskState.nodeId) == false) {
360+
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
361+
"] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]");
337362
}
338-
}
339363

340-
if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) {
341-
maxCurrentTermOnDiskState = onDiskState;
342-
}
364+
if (onDiskState.metadata.clusterUUIDCommitted()) {
365+
if (committedClusterUuid == null) {
366+
committedClusterUuid = onDiskState.metadata.clusterUUID();
367+
committedClusterUuidPath = dataPath;
368+
} else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) {
369+
throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid +
370+
"] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in ["
371+
+ dataPath + "]");
372+
}
373+
}
374+
375+
if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) {
376+
maxCurrentTermOnDiskState = onDiskState;
377+
}
343378

344-
long acceptedTerm = onDiskState.metadata.coordinationMetadata().term();
345-
long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term();
346-
if (bestOnDiskState.empty()
347-
|| acceptedTerm > maxAcceptedTerm
348-
|| (acceptedTerm == maxAcceptedTerm
379+
long acceptedTerm = onDiskState.metadata.coordinationMetadata().term();
380+
long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term();
381+
if (bestOnDiskState.empty()
382+
|| acceptedTerm > maxAcceptedTerm
383+
|| (acceptedTerm == maxAcceptedTerm
349384
&& (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion
350-
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
351-
&& onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
352-
bestOnDiskState = onDiskState;
385+
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
386+
&& onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
387+
bestOnDiskState = onDiskState;
388+
}
353389
}
354390
} catch (IndexNotFoundException e) {
355391
logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);

server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public void testDataOnlyNodePersistence() throws Exception {
371371
assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
372372
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
373373
CoordinationMetadata persistedCoordinationMetadata =
374-
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
374+
persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
375375
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
376376
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
377377
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
@@ -387,12 +387,12 @@ public void testDataOnlyNodePersistence() throws Exception {
387387
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();
388388

389389
assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
390-
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
390+
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
391391
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
392392
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
393393
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
394394
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
395-
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());
395+
assertTrue(persistedClusterStateService.loadBestOnDiskState(false).metadata.clusterUUIDCommitted());
396396

397397
// generate a series of updates and check if batching works
398398
final String indexName = randomAlphaOfLength(10);

server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.index.IndexWriter;
1414
import org.apache.lucene.index.IndexWriterConfig;
1515
import org.apache.lucene.index.Term;
16+
import org.apache.lucene.mockfile.ExtrasFS;
1617
import org.apache.lucene.store.Directory;
1718
import org.apache.lucene.store.FilterDirectory;
1819
import org.apache.lucene.store.IOContext;
@@ -40,12 +41,15 @@
4041
import org.elasticsearch.gateway.PersistedClusterStateService.Writer;
4142
import org.elasticsearch.index.Index;
4243
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
44+
import org.elasticsearch.test.CorruptionUtils;
4345
import org.elasticsearch.test.ESTestCase;
4446
import org.elasticsearch.test.MockLogAppender;
4547
import org.elasticsearch.test.junit.annotations.TestLogging;
4648

4749
import java.io.IOError;
4850
import java.io.IOException;
51+
import java.nio.file.DirectoryStream;
52+
import java.nio.file.Files;
4953
import java.nio.file.Path;
5054
import java.util.ArrayList;
5155
import java.util.Arrays;
@@ -56,12 +60,16 @@
5660
import java.util.concurrent.atomic.AtomicLong;
5761
import java.util.stream.Collectors;
5862
import java.util.stream.Stream;
63+
import java.util.stream.StreamSupport;
5964

65+
import static org.apache.lucene.index.IndexWriter.WRITE_LOCK_NAME;
6066
import static org.hamcrest.Matchers.allOf;
6167
import static org.hamcrest.Matchers.containsString;
68+
import static org.hamcrest.Matchers.endsWith;
6269
import static org.hamcrest.Matchers.equalTo;
6370
import static org.hamcrest.Matchers.lessThan;
6471
import static org.hamcrest.Matchers.nullValue;
72+
import static org.hamcrest.Matchers.startsWith;
6573

6674
public class PersistedClusterStateServiceTests extends ESTestCase {
6775

@@ -79,7 +87,7 @@ public void testPersistsAndReloadsTerm() throws IOException {
7987
assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(0L));
8088
try (Writer writer = persistedClusterStateService.createWriter()) {
8189
writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE);
82-
assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm));
90+
assertThat(persistedClusterStateService.loadBestOnDiskState(false).currentTerm, equalTo(newTerm));
8391
}
8492

8593
assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm));
@@ -218,8 +226,12 @@ public void testFailsOnMismatchedNodeIds() throws IOException {
218226
.toArray(Path[]::new), nodeIds[0], xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
219227
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L
220228
).loadBestOnDiskState()).getMessage();
221-
assertThat(message,
222-
allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1])));
229+
assertThat(message, allOf(
230+
containsString("the index containing the cluster metadata under the data path"),
231+
containsString("belongs to a node with ID"),
232+
containsString("but this node's ID is"),
233+
containsString(nodeIds[0]),
234+
containsString(nodeIds[1])));
223235
assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2),
224236
Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString())));
225237
}
@@ -315,7 +327,7 @@ public void testFailsIfFreshestStateIsInStaleTerm() throws IOException {
315327
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) {
316328
try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) {
317329
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService(nodeEnvironment)
318-
.loadBestOnDiskState();
330+
.loadBestOnDiskState(false);
319331
final ClusterState clusterState = clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
320332
writeState(writer, onDiskState.currentTerm, ClusterState.builder(clusterState)
321333
.metadata(Metadata.builder(clusterState.metadata()).version(2)
@@ -851,6 +863,30 @@ public void testSlowLogging() throws IOException, IllegalAccessException {
851863
}
852864
}
853865

866+
public void testFailsIfCorrupt() throws IOException {
867+
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
868+
final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment);
869+
870+
try (Writer writer = persistedClusterStateService.createWriter()) {
871+
writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE);
872+
}
873+
874+
try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(nodeEnvironment.nodeDataPaths()[0].resolve("_state"))) {
875+
CorruptionUtils.corruptFile(random(), randomFrom(StreamSupport
876+
.stream(directoryStream.spliterator(), false)
877+
.filter(p -> {
878+
final String filename = p.getFileName().toString();
879+
return ExtrasFS.isExtra(filename) == false && filename.equals(WRITE_LOCK_NAME) == false;
880+
})
881+
.collect(Collectors.toList())));
882+
}
883+
884+
assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadBestOnDiskState).getMessage(), allOf(
885+
startsWith("the index containing the cluster metadata under the data path ["),
886+
endsWith("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable")));
887+
}
888+
}
889+
854890
private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState,
855891
PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation)
856892
throws IllegalAccessException, IOException {
@@ -896,7 +932,7 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException
896932
}
897933

898934
private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException {
899-
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState();
935+
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false);
900936
return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
901937
}
902938

0 commit comments

Comments
 (0)