Skip to content

Commit 54b6885

Browse files
authored
Check index under the store metadata lock (#27768)
Today when we get a metadata snapshot directly from a store directory, we acquire a metadata lock, then acquire an IndexWriter lock. However, we create a CheckIndex in IndexShard without acquiring the metadata lock first. This causes a recovery failed because the IndexWriter lock can be still held by method snapshotStoreMetadata. This commit makes sure to create a CheckIndex under the metadata lock. Closes #24481 Closes #27731 Relates #24787
1 parent 4cbbe3e commit 54b6885

File tree

6 files changed

+123
-38
lines changed

6 files changed

+123
-38
lines changed

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1899,7 +1899,7 @@ public void noopUpdate(String type) {
18991899
internalIndexingStats.noopUpdate(type);
19001900
}
19011901

1902-
private void checkIndex() throws IOException {
1902+
void checkIndex() throws IOException {
19031903
if (store.tryIncRef()) {
19041904
try {
19051905
doCheckIndex();
@@ -1938,29 +1938,25 @@ private void doCheckIndex() throws IOException {
19381938
}
19391939
} else {
19401940
// full checkindex
1941-
try (CheckIndex checkIndex = new CheckIndex(store.directory())) {
1942-
checkIndex.setInfoStream(out);
1943-
CheckIndex.Status status = checkIndex.checkIndex();
1944-
out.flush();
1945-
1946-
if (!status.clean) {
1947-
if (state == IndexShardState.CLOSED) {
1948-
// ignore if closed....
1949-
return;
1941+
final CheckIndex.Status status = store.checkIndex(out);
1942+
out.flush();
1943+
if (!status.clean) {
1944+
if (state == IndexShardState.CLOSED) {
1945+
// ignore if closed....
1946+
return;
1947+
}
1948+
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
1949+
if ("fix".equals(checkIndexOnStartup)) {
1950+
if (logger.isDebugEnabled()) {
1951+
logger.debug("fixing index, writing new segments file ...");
19501952
}
1951-
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
1952-
if ("fix".equals(checkIndexOnStartup)) {
1953-
if (logger.isDebugEnabled()) {
1954-
logger.debug("fixing index, writing new segments file ...");
1955-
}
1956-
checkIndex.exorciseIndex(status);
1957-
if (logger.isDebugEnabled()) {
1958-
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
1959-
}
1960-
} else {
1961-
// only throw a failure if we are not going to fix the index
1962-
throw new IllegalStateException("index check failure but can't fix it");
1953+
store.exorciseIndex(status);
1954+
if (logger.isDebugEnabled()) {
1955+
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
19631956
}
1957+
} else {
1958+
// only throw a failure if we are not going to fix the index
1959+
throw new IllegalStateException("index check failure but can't fix it");
19641960
}
19651961
}
19661962
}

core/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
2424
import org.apache.logging.log4j.util.Supplier;
2525
import org.apache.lucene.codecs.CodecUtil;
26+
import org.apache.lucene.index.CheckIndex;
2627
import org.apache.lucene.index.CorruptIndexException;
2728
import org.apache.lucene.index.IndexCommit;
2829
import org.apache.lucene.index.IndexFileNames;
@@ -86,6 +87,7 @@
8687
import java.io.FileNotFoundException;
8788
import java.io.IOException;
8889
import java.io.InputStream;
90+
import java.io.PrintStream;
8991
import java.nio.file.AccessDeniedException;
9092
import java.nio.file.NoSuchFileException;
9193
import java.nio.file.Path;
@@ -341,6 +343,33 @@ public int compare(Map.Entry<String, String> o1, Map.Entry<String, String> o2) {
341343

342344
}
343345

346+
/**
347+
* Checks and returns the status of the existing index in this store.
348+
*
349+
* @param out where infoStream messages should go. See {@link CheckIndex#setInfoStream(PrintStream)}
350+
*/
351+
public CheckIndex.Status checkIndex(PrintStream out) throws IOException {
352+
metadataLock.writeLock().lock();
353+
try (CheckIndex checkIndex = new CheckIndex(directory)) {
354+
checkIndex.setInfoStream(out);
355+
return checkIndex.checkIndex();
356+
} finally {
357+
metadataLock.writeLock().unlock();
358+
}
359+
}
360+
361+
/**
362+
* Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}.
363+
*/
364+
public void exorciseIndex(CheckIndex.Status status) throws IOException {
365+
metadataLock.writeLock().lock();
366+
try (CheckIndex checkIndex = new CheckIndex(directory)) {
367+
checkIndex.exorciseIndex(status);
368+
} finally {
369+
metadataLock.writeLock().unlock();
370+
}
371+
}
372+
344373
public StoreStats stats() throws IOException {
345374
ensureOpen();
346375
return statsCache.getOrRefresh();

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2449,6 +2449,71 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
24492449
closeShards(newShard);
24502450
}
24512451

2452+
/**
2453+
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
2454+
* and checking index concurrently. This should always be possible without any exception.
2455+
*/
2456+
public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
2457+
final boolean isPrimary = randomBoolean();
2458+
IndexShard indexShard = newStartedShard(isPrimary);
2459+
final long numDocs = between(10, 100);
2460+
for (long i = 0; i < numDocs; i++) {
2461+
indexDoc(indexShard, "doc", Long.toString(i), "{\"foo\" : \"bar\"}");
2462+
if (randomBoolean()) {
2463+
indexShard.refresh("test");
2464+
}
2465+
}
2466+
indexShard.flush(new FlushRequest());
2467+
closeShards(indexShard);
2468+
2469+
final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
2470+
isPrimary ? RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
2471+
);
2472+
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
2473+
.settings(Settings.builder()
2474+
.put(indexShard.indexSettings.getSettings())
2475+
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
2476+
.build();
2477+
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
2478+
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer());
2479+
2480+
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
2481+
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
2482+
AtomicBoolean stop = new AtomicBoolean(false);
2483+
CountDownLatch latch = new CountDownLatch(1);
2484+
Thread snapshotter = new Thread(() -> {
2485+
latch.countDown();
2486+
while (stop.get() == false) {
2487+
try {
2488+
Store.MetadataSnapshot readMeta = newShard.snapshotStoreMetadata();
2489+
assertThat(readMeta.getNumDocs(), equalTo(numDocs));
2490+
assertThat(storeFileMetaDatas.recoveryDiff(readMeta).different.size(), equalTo(0));
2491+
assertThat(storeFileMetaDatas.recoveryDiff(readMeta).missing.size(), equalTo(0));
2492+
assertThat(storeFileMetaDatas.recoveryDiff(readMeta).identical.size(), equalTo(storeFileMetaDatas.size()));
2493+
} catch (IOException e) {
2494+
throw new AssertionError(e);
2495+
}
2496+
}
2497+
});
2498+
snapshotter.start();
2499+
2500+
if (isPrimary) {
2501+
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(),
2502+
getFakeDiscoNode(newShard.routingEntry().currentNodeId()), null));
2503+
} else {
2504+
newShard.markAsRecovering("peer", new RecoveryState(newShard.routingEntry(),
2505+
getFakeDiscoNode(newShard.routingEntry().currentNodeId()), getFakeDiscoNode(newShard.routingEntry().currentNodeId())));
2506+
}
2507+
int iters = iterations(10, 100);
2508+
latch.await();
2509+
for (int i = 0; i < iters; i++) {
2510+
newShard.checkIndex();
2511+
}
2512+
assertTrue(stop.compareAndSet(false, true));
2513+
snapshotter.join();
2514+
closeShards(newShard);
2515+
}
2516+
24522517
class Result {
24532518
private final int localCheckpoint;
24542519
private final int maxSeqNo;

core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -204,16 +204,13 @@ public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
204204
if (!Lucene.indexExists(store.directory()) && indexShard.state() == IndexShardState.STARTED) {
205205
return;
206206
}
207-
try (CheckIndex checkIndex = new CheckIndex(store.directory())) {
208-
BytesStreamOutput os = new BytesStreamOutput();
209-
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
210-
checkIndex.setInfoStream(out);
211-
out.flush();
212-
CheckIndex.Status status = checkIndex.checkIndex();
213-
if (!status.clean) {
214-
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
215-
throw new IOException("index check failure");
216-
}
207+
BytesStreamOutput os = new BytesStreamOutput();
208+
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
209+
CheckIndex.Status status = store.checkIndex(out);
210+
out.flush();
211+
if (!status.clean) {
212+
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
213+
throw new IOException("index check failure");
217214
}
218215
} catch (Exception e) {
219216
exception.add(e);

core/src/test/java/org/elasticsearch/index/store/StoreTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,4 +1070,5 @@ public Directory newDirectory() throws IOException {
10701070
}
10711071
store.close();
10721072
}
1073+
10731074
}

test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryService.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,17 +119,14 @@ public static void checkIndex(Logger logger, Store store, ShardId shardId) {
119119
if (!Lucene.indexExists(dir)) {
120120
return;
121121
}
122-
try (CheckIndex checkIndex = new CheckIndex(dir)) {
122+
try {
123123
BytesStreamOutput os = new BytesStreamOutput();
124124
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
125-
checkIndex.setInfoStream(out);
125+
CheckIndex.Status status = store.checkIndex(out);
126126
out.flush();
127-
CheckIndex.Status status = checkIndex.checkIndex();
128127
if (!status.clean) {
129128
ESTestCase.checkIndexFailed = true;
130-
logger.warn("check index [failure] index files={}\n{}",
131-
Arrays.toString(dir.listAll()),
132-
os.bytes().utf8ToString());
129+
logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
133130
throw new IOException("index check failure");
134131
} else {
135132
if (logger.isDebugEnabled()) {

0 commit comments

Comments
 (0)