Skip to content

Commit 9533679

Browse files
committed
Fix double notify bug
1 parent 5b63507 commit 9533679

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,12 @@ public interface GlobalCheckpointListener {
9292
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) {
9393
if (closed) {
9494
executor.execute(() -> listener.accept(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
95+
return;
9596
}
9697
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) {
9798
// notify directly
98-
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
99+
executor.execute(() -> listener.accept(lastKnownGlobalCheckpoint, null));
100+
return;
99101
} else {
100102
if (listeners == null) {
101103
listeners = new ArrayList<>();
@@ -112,6 +114,10 @@ public void close() throws IOException {
112114
notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId));
113115
}
114116

117+
synchronized int pendingListeners() {
118+
return listeners == null ? 0 : listeners.size();
119+
}
120+
115121
/**
116122
* Invoke to notify all registered listeners of an updated global checkpoint.
117123
*

server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.mockito.ArgumentCaptor;
2727

2828
import java.io.IOException;
29+
import java.io.UncheckedIOException;
2930
import java.util.List;
3031
import java.util.concurrent.BrokenBarrierException;
3132
import java.util.concurrent.CopyOnWriteArrayList;
@@ -350,11 +351,21 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio
350351
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get());
351352
final CyclicBarrier barrier = new CyclicBarrier(3);
352353
final int numberOfIterations = randomIntBetween(1, 1024);
353-
354+
final AtomicBoolean closed = new AtomicBoolean();
354355
final Thread updatingThread = new Thread(() -> {
355356
awaitQuietly(barrier);
356357
for (int i = 0; i < numberOfIterations; i++) {
357-
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet());
358+
if (rarely() && closed.get() == false) {
359+
closed.set(true);
360+
try {
361+
globalCheckpointListeners.close();
362+
} catch (final IOException e) {
363+
throw new UncheckedIOException(e);
364+
}
365+
}
366+
if (closed.get() == false) {
367+
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet());
368+
}
358369
}
359370
awaitQuietly(barrier);
360371
});
@@ -381,7 +392,10 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio
381392
barrier.await();
382393
barrier.await();
383394
// one last update to ensure all listeners are notified
384-
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet());
395+
if (closed.get() == false) {
396+
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet());
397+
}
398+
assertThat(globalCheckpointListeners.pendingListeners(), equalTo(0));
385399
executor.shutdown();
386400
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
387401
for (final AtomicBoolean invocation : invocations) {

0 commit comments

Comments
 (0)