Skip to content

Commit c79b276

Browse files
Store Pending Deletions Fix (#40345)
FilterDirectory.getPendingDeletions does not delegate, fixed temporarily by overriding in StoreDirectory. This in turn caused duplicate file name use after a trimUnsafeCommits had been done, since a new IndexWriter would not consider the pending deletes in IndexFileDeleter. This should only happen on windows (AFAIK). Reenabled doing index updates for all tests using IndexShardTests.indexOnReplicaWithGaps (which could fail due to above when using mocked WindowsFS). Added getPendingDeletions delegation to all elasticsearch FilterDirectory subclasses that were not trivial test-only overrides to minimize the risk of hitting this issue in another case.
1 parent cb67585 commit c79b276

File tree

9 files changed

+81
-17
lines changed

9 files changed

+81
-17
lines changed

plugins/store-smb/src/main/java/org/elasticsearch/index/store/SmbDirectoryWrapper.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@
1919

2020
package org.elasticsearch.index.store;
2121

22-
import java.io.FilterOutputStream;
23-
import java.io.IOException;
24-
import java.nio.channels.Channels;
25-
import java.nio.file.Files;
26-
import java.nio.file.StandardOpenOption;
2722
import org.apache.lucene.store.FSDirectory;
2823
import org.apache.lucene.store.FilterDirectory;
2924
import org.apache.lucene.store.IOContext;
3025
import org.apache.lucene.store.IndexOutput;
3126
import org.apache.lucene.store.OutputStreamIndexOutput;
3227

28+
import java.io.FilterOutputStream;
29+
import java.io.IOException;
30+
import java.nio.channels.Channels;
31+
import java.nio.file.Files;
32+
import java.nio.file.StandardOpenOption;
33+
import java.util.Set;
34+
3335
/**
3436
* This class is used to wrap an existing {@link org.apache.lucene.store.FSDirectory} so that
3537
* the new shard segment files will be opened for Read and Write access.
@@ -78,4 +80,10 @@ public void write(byte[] b, int offset, int length) throws IOException {
7880
CHUNK_SIZE);
7981
}
8082
}
83+
84+
// temporary override until LUCENE-8735 is integrated
85+
@Override
86+
public Set<String> getPendingDeletions() throws IOException {
87+
return in.getPendingDeletions();
88+
}
8189
}

server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.Closeable;
3434
import java.io.IOException;
3535
import java.util.Collection;
36+
import java.util.Set;
3637
import java.util.concurrent.atomic.AtomicBoolean;
3738

3839
final class LocalShardSnapshot implements Closeable {
@@ -116,6 +117,12 @@ public Lock obtainLock(String name) throws IOException {
116117
public void close() throws IOException {
117118
throw new UnsupportedOperationException("nobody should close this directory wrapper");
118119
}
120+
121+
// temporary override until LUCENE-8735 is integrated
122+
@Override
123+
public Set<String> getPendingDeletions() throws IOException {
124+
return in.getPendingDeletions();
125+
}
119126
};
120127
}
121128

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,12 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
261261
assert index.getFileDetails(dest).recovered() == l : index.getFileDetails(dest).toString();
262262
}
263263
}
264+
265+
// temporary override until LUCENE-8735 is integrated
266+
@Override
267+
public Set<String> getPendingDeletions() throws IOException {
268+
return in.getPendingDeletions();
269+
}
264270
}
265271

266272
/**

server/src/main/java/org/elasticsearch/index/store/ByteSizeCachingDirectory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.UncheckedIOException;
3333
import java.nio.file.AccessDeniedException;
3434
import java.nio.file.NoSuchFileException;
35+
import java.util.Set;
3536

3637
final class ByteSizeCachingDirectory extends FilterDirectory {
3738

@@ -180,4 +181,9 @@ public void deleteFile(String name) throws IOException {
180181
}
181182
}
182183

184+
// temporary override until LUCENE-8735 is integrated
185+
@Override
186+
public Set<String> getPendingDeletions() throws IOException {
187+
return in.getPendingDeletions();
188+
}
183189
}

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,13 @@ private void innerClose() throws IOException {
734734
public String toString() {
735735
return "store(" + in.toString() + ")";
736736
}
737+
738+
@Override
739+
public Set<String> getPendingDeletions() throws IOException {
740+
// FilterDirectory.getPendingDeletions does not delegate, working around it here.
741+
// to be removed once fixed in FilterDirectory.
742+
return unwrap(this).getPendingDeletions();
743+
}
737744
}
738745

739746
/**

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
556556

557557
// most of the time this is large enough that most of the time there will be at least one gap
558558
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
559-
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false);
559+
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
560560

561561
final int maxSeqNo = result.maxSeqNo;
562562
final boolean gap = result.gap;
@@ -1104,7 +1104,7 @@ public void testGlobalCheckpointSync() throws IOException {
11041104
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
11051105
final IndexShard indexShard = newStartedShard(false);
11061106
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
1107-
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), true);
1107+
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
11081108

11091109
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
11101110
final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
@@ -1170,9 +1170,7 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt
11701170

11711171
// most of the time this is large enough that most of the time there will be at least one gap
11721172
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
1173-
// todo: all tests should run with allowUpdates=true, but this specific test sometimes fails during lucene commit when updates are
1174-
// added (seed = F37E9647ABE5928)
1175-
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false);
1173+
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
11761174

11771175
final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
11781176
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
@@ -1215,7 +1213,7 @@ public void onFailure(final Exception e) {
12151213
}
12161214
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(newMaxSeqNoOfUpdates));
12171215
// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
1218-
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()), false);
1216+
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
12191217
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
12201218
closeShard(indexShard, false);
12211219
}
@@ -1473,6 +1471,12 @@ public String[] listAll() throws IOException {
14731471
return super.listAll();
14741472
}
14751473
}
1474+
1475+
// temporary override until LUCENE-8735 is integrated
1476+
@Override
1477+
public Set<String> getPendingDeletions() throws IOException {
1478+
return in.getPendingDeletions();
1479+
}
14761480
};
14771481

14781482
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
@@ -3126,23 +3130,21 @@ class Result {
31263130
* @param indexShard the shard
31273131
* @param operations the number of operations
31283132
* @param offset the starting sequence number
3129-
* @param allowUpdates whether updates should be added.
31303133
* @return a pair of the maximum sequence number and whether or not a gap was introduced
31313134
* @throws IOException if an I/O exception occurs while indexing on the shard
31323135
*/
31333136
private Result indexOnReplicaWithGaps(
31343137
final IndexShard indexShard,
31353138
final int operations,
3136-
final int offset,
3137-
boolean allowUpdates) throws IOException {
3139+
final int offset) throws IOException {
31383140
int localCheckpoint = offset;
31393141
int max = offset;
31403142
boolean gap = false;
31413143
Set<String> ids = new HashSet<>();
31423144
for (int i = offset + 1; i < operations; i++) {
31433145
if (!rarely() || i == operations - 1) { // last operation can't be a gap as it's not a gap anymore
31443146
final String id = ids.isEmpty() || randomBoolean() ? Integer.toString(i) : randomFrom(ids);
3145-
if (allowUpdates && ids.add(id) == false) { // this is an update
3147+
if (ids.add(id) == false) { // this is an update
31463148
indexShard.advanceMaxSeqNoOfUpdatesOrDeletes(i);
31473149
}
31483150
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id,
@@ -3557,7 +3559,7 @@ public void testSupplyTombstoneDoc() throws Exception {
35573559

35583560
public void testResetEngine() throws Exception {
35593561
IndexShard shard = newStartedShard(false);
3560-
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()), false);
3562+
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
35613563
final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint());
35623564
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
35633565
Set<String> docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream()
@@ -3597,7 +3599,7 @@ public void testResetEngine() throws Exception {
35973599

35983600
public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
35993601
final IndexShard replica = newStartedShard(false);
3600-
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()), false);
3602+
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));
36013603

36023604
final int nbTermUpdates = randomIntBetween(1, 5);
36033605

server/src/test/java/org/elasticsearch/index/store/ByteSizeCachingDirectoryTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.test.ESTestCase;
2929

3030
import java.io.IOException;
31+
import java.util.Set;
3132

3233
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
3334
public class ByteSizeCachingDirectoryTests extends ESTestCase {
@@ -45,6 +46,12 @@ public long fileLength(String name) throws IOException {
4546
numFileLengthCalls++;
4647
return super.fileLength(name);
4748
}
49+
50+
// temporary override until LUCENE-8735 is integrated
51+
@Override
52+
public Set<String> getPendingDeletions() throws IOException {
53+
return in.getPendingDeletions();
54+
}
4855
}
4956

5057
public void testBasics() throws IOException {

server/src/test/java/org/elasticsearch/index/store/StoreTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@
4343
import org.apache.lucene.store.BaseDirectoryWrapper;
4444
import org.apache.lucene.store.ChecksumIndexInput;
4545
import org.apache.lucene.store.Directory;
46+
import org.apache.lucene.store.FilterDirectory;
4647
import org.apache.lucene.store.IOContext;
4748
import org.apache.lucene.store.IndexInput;
4849
import org.apache.lucene.store.IndexOutput;
50+
import org.apache.lucene.store.NIOFSDirectory;
4951
import org.apache.lucene.store.RAMDirectory;
5052
import org.apache.lucene.util.BytesRef;
5153
import org.apache.lucene.util.TestUtil;
@@ -1143,4 +1145,16 @@ IndexWriter openWriter(Store store) throws IOException {
11431145
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
11441146
return new IndexWriter(store.directory(), iwc);
11451147
}
1148+
1149+
public void testGetPendingFiles() throws IOException {
1150+
final ShardId shardId = new ShardId("index", "_na_", 1);
1151+
final String testfile = "testfile";
1152+
try (Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId))) {
1153+
store.directory().createOutput(testfile, IOContext.DEFAULT).close();
1154+
try (IndexInput input = store.directory().openInput(testfile, IOContext.DEFAULT)) {
1155+
store.directory().deleteFile(testfile);
1156+
assertEquals(FilterDirectory.unwrap(store.directory()).getPendingDeletions(), store.directory().getPendingDeletions());
1157+
}
1158+
}
1159+
}
11461160
}

test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.nio.file.Path;
5555
import java.util.Arrays;
5656
import java.util.Random;
57+
import java.util.Set;
5758

5859
public class MockFSDirectoryService extends FsDirectoryService {
5960

@@ -187,6 +188,12 @@ public synchronized void crash() throws IOException {
187188
super.crash();
188189
}
189190
}
191+
192+
// temporary override until LUCENE-8735 is integrated
193+
@Override
194+
public Set<String> getPendingDeletions() throws IOException {
195+
return in.getPendingDeletions();
196+
}
190197
}
191198

192199
final class CloseableDirectory implements Closeable {

0 commit comments

Comments
 (0)