Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public static boolean isCorruptMarkerFileIsPresent(final Directory directory) th

final String[] files = directory.listAll();
for (String file : files) {
if (file.startsWith(Store.CORRUPTED)) {
if (file.startsWith(Store.CORRUPTED_MARKER_NAME_PREFIX)) {
found = true;
break;
}
Expand All @@ -232,7 +232,7 @@ protected void dropCorruptMarkerFiles(Terminal terminal, Path path, Directory di
}
String[] files = directory.listAll();
for (String file : files) {
if (file.startsWith(Store.CORRUPTED)) {
if (file.startsWith(Store.CORRUPTED_MARKER_NAME_PREFIX)) {
directory.deleteFile(file);

terminal.println("Deleted corrupt marker " + file + " from " + path);
Expand Down
64 changes: 23 additions & 41 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public static final Setting<Boolean> FORCE_RAM_TERM_DICT = Setting.boolSetting("index.force_memory_term_dictionary", false,
Property.IndexScope, Property.Deprecated);
static final String CODEC = "store";
static final int VERSION_WRITE_THROWABLE= 2; // we write throwable since 2.0
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
static final int VERSION_START = 0;
static final int VERSION = VERSION_WRITE_THROWABLE;
static final int CORRUPTED_MARKER_CODEC_VERSION = 2;
// public is for test purposes
public static final String CORRUPTED = "corrupted_";
public static final String CORRUPTED_MARKER_NAME_PREFIX = "corrupted_";
public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);

Expand Down Expand Up @@ -447,7 +444,7 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId
Logger logger) throws IOException {
try (ShardLock lock = shardLocker.lock(shardId, "read metadata snapshot", TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
Expand All @@ -468,7 +465,7 @@ public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnviron
Logger logger) throws IOException, ShardLockObtainFailedException {
try (ShardLock lock = shardLocker.lock(shardId, "open index", TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
failIfCorrupted(dir);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
}
Expand Down Expand Up @@ -549,7 +546,7 @@ public boolean isMarkedCorrupted() throws IOException {
*/
final String[] files = directory().listAll();
for (String file : files) {
if (file.startsWith(CORRUPTED)) {
if (file.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
return true;
}
}
Expand All @@ -565,7 +562,7 @@ public void removeCorruptionMarker() throws IOException {
IOException firstException = null;
final String[] files = directory.listAll();
for (String file : files) {
if (file.startsWith(CORRUPTED)) {
if (file.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
try {
directory.deleteFile(file);
} catch (IOException ex) {
Expand All @@ -584,40 +581,25 @@ public void removeCorruptionMarker() throws IOException {

public void failIfCorrupted() throws IOException {
ensureOpen();
failIfCorrupted(directory, shardId);
failIfCorrupted(directory);
}

private static void failIfCorrupted(Directory directory, ShardId shardId) throws IOException {
private static void failIfCorrupted(Directory directory) throws IOException {
final String[] files = directory.listAll();
List<CorruptIndexException> ex = new ArrayList<>();
for (String file : files) {
if (file.startsWith(CORRUPTED)) {
if (file.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
try (ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) {
int version = CodecUtil.checkHeader(input, CODEC, VERSION_START, VERSION);

if (version == VERSION_WRITE_THROWABLE) {
final int size = input.readVInt();
final byte[] buffer = new byte[size];
input.readBytes(buffer, 0, buffer.length);
StreamInput in = StreamInput.wrap(buffer);
Exception t = in.readException();
if (t instanceof CorruptIndexException) {
ex.add((CorruptIndexException) t);
} else {
ex.add(new CorruptIndexException(t.getMessage(), "preexisting_corruption", t));
}
CodecUtil.checkHeader(input, CODEC, CORRUPTED_MARKER_CODEC_VERSION, CORRUPTED_MARKER_CODEC_VERSION);
final int size = input.readVInt();
final byte[] buffer = new byte[size];
input.readBytes(buffer, 0, buffer.length);
StreamInput in = StreamInput.wrap(buffer);
Exception t = in.readException();
if (t instanceof CorruptIndexException) {
ex.add((CorruptIndexException) t);
} else {
assert version == VERSION_START || version == VERSION_STACK_TRACE;
String msg = input.readString();
StringBuilder builder = new StringBuilder(shardId.toString());
builder.append(" Preexisting corrupted index [");
builder.append(file).append("] caused by: ");
builder.append(msg);
if (version == VERSION_STACK_TRACE) {
builder.append(System.lineSeparator());
builder.append(input.readString());
}
ex.add(new CorruptIndexException(builder.toString(), "preexisting_corruption"));
ex.add(new CorruptIndexException(t.getMessage(), "preexisting_corruption", t));
}
CodecUtil.checkFooter(input);
}
Expand Down Expand Up @@ -653,7 +635,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) thr
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS)
|| existingFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)
|| existingFile.startsWith(CORRUPTED)) {
|| existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
Expand Down Expand Up @@ -1378,9 +1360,9 @@ public void deleteQuiet(String... files) {
public void markStoreCorrupted(IOException exception) throws IOException {
ensureOpen();
if (!isMarkedCorrupted()) {
String uuid = CORRUPTED + UUIDs.randomBase64UUID();
try (IndexOutput output = this.directory().createOutput(uuid, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, CODEC, VERSION);
final String corruptionMarkerName = CORRUPTED_MARKER_NAME_PREFIX + UUIDs.randomBase64UUID();
try (IndexOutput output = this.directory().createOutput(corruptionMarkerName, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, CODEC, CORRUPTED_MARKER_CODEC_VERSION);
BytesStreamOutput out = new BytesStreamOutput();
out.writeException(exception);
BytesReference bytes = out.bytes();
Expand All @@ -1391,7 +1373,7 @@ public void markStoreCorrupted(IOException exception) throws IOException {
} catch (IOException ex) {
logger.warn("Can't mark store as corrupted", ex);
}
directory().sync(Collections.singleton(uuid));
directory().sync(Collections.singleton(corruptionMarkerName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3095,7 +3095,7 @@ public void testIndexCheckOnStartup() throws Exception {
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED_MARKER_NAME_PREFIX)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
Expand Down Expand Up @@ -3171,7 +3171,7 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED_MARKER_NAME_PREFIX)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
Expand Down
69 changes: 13 additions & 56 deletions server/src/test/java/org/elasticsearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,17 @@

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;

public class StoreTests extends ESTestCase {

Expand Down Expand Up @@ -977,65 +979,20 @@ public void testDeserializeCorruptionException() throws IOException {
store.close();
}

public void testCanReadOldCorruptionMarker() throws IOException {
public void testCorruptionMarkerVersionCheck() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
final Directory dir = new RAMDirectory(); // I use ram dir to prevent that virusscanner being a PITA
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));

CorruptIndexException exception = new CorruptIndexException("foo", "bar");
String uuid = Store.CORRUPTED + UUIDs.randomBase64UUID();
try (IndexOutput output = dir.createOutput(uuid, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, Store.CODEC, Store.VERSION_STACK_TRACE);
output.writeString(exception.getMessage());
output.writeString(ExceptionsHelper.stackTrace(exception));
CodecUtil.writeFooter(output);
}
try {
store.failIfCorrupted();
fail("should be corrupted");
} catch (CorruptIndexException e) {
assertThat(e.getMessage(), startsWith("[index][1] Preexisting corrupted index [" + uuid + "] caused by: foo (resource=bar)"));
assertTrue(e.getMessage().contains(ExceptionsHelper.stackTrace(exception)));
}

store.removeCorruptionMarker();

try (IndexOutput output = dir.createOutput(uuid, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, Store.CODEC, Store.VERSION_START);
output.writeString(exception.getMessage());
CodecUtil.writeFooter(output);
}
try {
store.failIfCorrupted();
fail("should be corrupted");
} catch (CorruptIndexException e) {
assertThat(e.getMessage(), startsWith("[index][1] Preexisting corrupted index [" + uuid + "] caused by: foo (resource=bar)"));
assertFalse(e.getMessage().contains(ExceptionsHelper.stackTrace(exception)));
}

store.removeCorruptionMarker();

try (IndexOutput output = dir.createOutput(uuid, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, Store.CODEC, Store.VERSION_START - 1); // corrupted header
CodecUtil.writeFooter(output);
}
try {
store.failIfCorrupted();
fail("should be too old");
} catch (IndexFormatTooOldException e) {
}

store.removeCorruptionMarker();
try (IndexOutput output = dir.createOutput(uuid, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, Store.CODEC, Store.VERSION+1); // corrupted header
CodecUtil.writeFooter(output);
}
try {
store.failIfCorrupted();
fail("should be too new");
} catch (IndexFormatTooNewException e) {
try (Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId))) {
final String corruptionMarkerName = Store.CORRUPTED_MARKER_NAME_PREFIX + UUIDs.randomBase64UUID();
try (IndexOutput output = dir.createOutput(corruptionMarkerName, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, Store.CODEC, Store.CORRUPTED_MARKER_CODEC_VERSION + randomFrom(1, 2, -1, -2, -3));
// we only need the header to trigger the exception
}
final IOException ioException = expectThrows(IOException.class, store::failIfCorrupted);
assertThat(ioException, anyOf(instanceOf(IndexFormatTooOldException.class), instanceOf(IndexFormatTooNewException.class)));
assertThat(ioException.getMessage(), containsString(corruptionMarkerName));
}
store.close();
}

public void testHistoryUUIDCanBeForced() throws IOException {
Expand Down