From 3d175ab89dbd078c2fc99c2c3d70c8f1aa104f80 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 6 Aug 2018 21:52:04 -0400 Subject: [PATCH 01/20] Introduce global checkpoint listeners This commit introduces the ability for global checkpoint listeners to be registered at the shard level. These listeners are notified when the global checkpoint is updated, and also when the shard closes. To encapsulate these listeners, we introduce a shard-level component that handles synchronization of notification and modifications to the collection of listeners. --- .../shard/GlobalCheckpointListeners.java | 129 ++++++++ .../elasticsearch/index/shard/IndexShard.java | 21 +- .../shard/GlobalCheckpointListenersTests.java | 299 ++++++++++++++++++ .../index/shard/IndexShardIT.java | 28 ++ 4 files changed, 471 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java new file mode 100644 index 0000000000000..d7fc30f9600c2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executor; + +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +/** + * Represents a collection of global checkpoint listeners. This collection can be added to, and all listeners present at the time of an + * update will be notified together. All listeners will be notified when the shard is closed. + */ +public class GlobalCheckpointListeners implements Closeable { + + /** + * A global checkpoint listener consisting of a callback that is notified when the global checkpoint is updated or the shard is closed. + */ + @FunctionalInterface + public interface GlobalCheckpointListener { + /** + * Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint + * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the + * global checkpoint is updated, the exception will be null. + * + * @param globalCheckpoint the updated global checkpoint + * @param e if non-null, the shard is closed + */ + void accept(final long globalCheckpoint, final IndexShardClosedException e); + } + + // guarded by this + private boolean closed; + private volatile List listeners; + + private final ShardId shardId; + private final Executor executor; + private final Logger logger; + + GlobalCheckpointListeners(final ShardId shardId, final Executor executor, final Logger logger) { + this.shardId = Objects.requireNonNull(shardId); + this.executor = Objects.requireNonNull(executor); + this.logger = Objects.requireNonNull(logger); + } + + synchronized void add(final GlobalCheckpointListener listener) { + if (closed) { + throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"); + } + if (listeners == null) { + listeners = new ArrayList<>(); + } + listeners.add(listener); + } + + @Override + public void close() throws IOException { + synchronized (this) { + closed = true; + } + notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); + } + + /** + * Invoke to notify all registered listeners of an updated global checkpoint. + * + * @param globalCheckpoint the updated global checkpoint + */ + void globalCheckpointUpdated(final long globalCheckpoint) { + assert globalCheckpoint >= NO_OPS_PERFORMED; + notifyListeners(globalCheckpoint, null); + } + + private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { + assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); + if (listeners != null) { + final List currentListeners; + synchronized (this) { + currentListeners = listeners; + listeners = null; + } + if (currentListeners != null) { + executor.execute(() -> { + for (final GlobalCheckpointListener listener : currentListeners) { + try { + listener.accept(globalCheckpoint, e); + } catch (final Exception caught) { + if (globalCheckpoint != UNASSIGNED_SEQ_NO) { + logger.warn( + new ParameterizedMessage( + "error notifying global checkpoint listener of updated global checkpoint [{}]", + globalCheckpoint), + caught); + } else { + logger.warn("error notifying global checkpoint listener of closed shard", caught); + } + } + } + }); + } + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 08a0111fb4dc5..89e0e108a460e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -161,6 +161,8 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.index.mapper.SourceToParse.source; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -189,6 +191,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final SearchOperationListener searchOperationListener; + private final GlobalCheckpointListeners globalCheckpointListeners; private final ReplicationTracker replicationTracker; protected volatile ShardRouting shardRouting; @@ -298,8 +301,10 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); + this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger); this.replicationTracker = - new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {}); + new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated); + // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { @@ -664,7 +669,7 @@ private IndexShardState changeState(IndexShardState newState, String reason) { public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse, long autoGeneratedTimestamp, boolean isRetry) throws IOException { assert versionType.validateVersionForWrites(version); - return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, + return applyIndexOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } @@ -765,7 +770,7 @@ public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) { public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType) throws IOException { assert versionType.validateVersionForWrites(version); - return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, + return applyDeleteOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, Engine.Operation.Origin.PRIMARY); } @@ -1192,7 +1197,7 @@ public void close(String reason, boolean flushEngine) throws IOException { } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(engine, refreshListeners); + IOUtils.close(engine, globalCheckpointListeners, refreshListeners); indexShardOperationPermits.close(); } } @@ -1729,6 +1734,10 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } + public void addGlobalCheckpointListener(final GlobalCheckpointListeners.GlobalCheckpointListener listener) { + this.globalCheckpointListeners.add(listener); + } + /** * Waits for all operations up to the provided sequence number to complete. * @@ -2273,8 +2282,8 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); final long currentGlobalCheckpoint = getGlobalCheckpoint(); final long localCheckpoint; - if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { - localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; + if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) { + localCheckpoint = NO_OPS_PERFORMED; } else { localCheckpoint = currentGlobalCheckpoint; } diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java new file mode 100644 index 0000000000000..9f8f8a4328bb1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -0,0 +1,299 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + + +public class GlobalCheckpointListenersTests extends ESTestCase { + + final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); + + public void testGlobalCheckpointUpdated() { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final int numberOfListeners = randomIntBetween(0, 16); + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final AtomicBoolean invoked = new AtomicBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (globalCheckpoint, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + assert globalCheckpoint != UNASSIGNED_SEQ_NO; + assert e == null; + globalCheckpoints[index] = globalCheckpoint; + }; + globalCheckpointListeners.add(listener); + } + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + + // test the listeners are not invoked twice + final long nextGlobalCheckpoint = randomValueOtherThan(globalCheckpoint, () -> randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE)); + globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + + public void testClose() throws IOException { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final int numberOfListeners = randomIntBetween(0, 16); + final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final AtomicBoolean invoked = new AtomicBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (globalCheckpoint, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + assert globalCheckpoint == UNASSIGNED_SEQ_NO; + assert e != null; + exceptions[index] = e; + }; + globalCheckpointListeners.add(listener); + } + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertNotNull(exceptions[i]); + assertThat(exceptions[i].getShardId(), equalTo(shardId)); + } + + // test the listeners are not invoked twice + for (int i = 0; i < numberOfListeners; i++) { + exceptions[i] = null; + } + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertNull(exceptions[i]); + } + } + + public void testAddAfterClose() throws IOException { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + globalCheckpointListeners.close(); + final IllegalStateException expected = + expectThrows(IllegalStateException.class, () -> globalCheckpointListeners.add(((globalCheckpoint, e) -> {}))); + assertThat( + expected, + hasToString(containsString("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"))); + } + + public void testFailingListenerOnUpdate() { + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final int numberOfListeners = randomIntBetween(0, 16); + final boolean[] failures = new boolean[numberOfListeners]; + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final boolean failure = randomBoolean(); + failures[index] = failure; + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (globalCheckpoint, e) -> { + assert globalCheckpoint != UNASSIGNED_SEQ_NO; + assert e == null; + if (failure) { + globalCheckpoints[index] = Long.MIN_VALUE; + throw new RuntimeException("failure"); + } else { + globalCheckpoints[index] = globalCheckpoint; + } + }; + globalCheckpointListeners.add(listener); + } + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); + } else { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + int failureCount = 0; + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + failureCount++; + } + } + if (failureCount > 0) { + final ArgumentCaptor message = ArgumentCaptor.forClass(ParameterizedMessage.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(RuntimeException.class); + verify(mockLogger, times(failureCount)).warn(message.capture(), t.capture()); + assertThat( + message.getValue().getFormat(), + equalTo("error notifying global checkpoint listener of updated global checkpoint [{}]")); + assertNotNull(message.getValue().getParameters()); + assertThat(message.getValue().getParameters().length, equalTo(1)); + assertThat(message.getValue().getParameters()[0], equalTo(globalCheckpoint)); + assertNotNull(t.getValue()); + assertThat(t.getValue().getMessage(), equalTo("failure")); + } + } + + public void testFailingListenerOnClose() throws IOException { + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final int numberOfListeners = randomIntBetween(0, 16); + final boolean[] failures = new boolean[numberOfListeners]; + final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final boolean failure = randomBoolean(); + failures[index] = failure; + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (globalCheckpoint, e) -> { + assert globalCheckpoint == UNASSIGNED_SEQ_NO; + assert e != null; + if (failure) { + throw new RuntimeException("failure"); + } else { + exceptions[index] = e; + } + }; + globalCheckpointListeners.add(listener); + } + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + assertNull(exceptions[i]); + } else { + assertNotNull(exceptions[i]); + assertThat(exceptions[i].getShardId(), equalTo(shardId)); + } + } + int failureCount = 0; + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + failureCount++; + } + } + if (failureCount > 0) { + final ArgumentCaptor message = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(RuntimeException.class); + verify(mockLogger, times(failureCount)).warn(message.capture(), t.capture()); + assertThat(message.getValue(), equalTo("error notifying global checkpoint listener of closed shard")); + assertNotNull(t.getValue()); + assertThat(t.getValue().getMessage(), equalTo("failure")); + } + } + + public void testNotificationUsesExecutor() { + final AtomicInteger count = new AtomicInteger(); + final Executor executor = command -> { + count.incrementAndGet(); + command.run(); + }; + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + final int numberOfListeners = randomIntBetween(0, 16); + for (int i = 0; i < numberOfListeners; i++) { + globalCheckpointListeners.add(((globalCheckpoint, e) -> {})); + } + globalCheckpointListeners.globalCheckpointUpdated(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE)); + assertThat(count.get(), equalTo(1)); + } + + public void testConcurrency() throws BrokenBarrierException, InterruptedException { + final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, executor, logger); + final CyclicBarrier barrier = new CyclicBarrier(3); + final int numberOfIterations = randomIntBetween(1, 1024); + final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); + + final Thread updatingThread = new Thread(() -> { + awaitQuietly(barrier); + for (int i = 0; i < numberOfIterations; i++) { + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + } + awaitQuietly(barrier); + }); + + final List invocations = new CopyOnWriteArrayList<>(); + final Thread listenersThread = new Thread(() -> { + awaitQuietly(barrier); + for (int i = 0; i < numberOfIterations; i++) { + final AtomicBoolean invocation = new AtomicBoolean(); + invocations.add(invocation); + globalCheckpointListeners.add(((g, e) -> { + if (invocation.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + })); + } + awaitQuietly(barrier); + }); + updatingThread.start(); + listenersThread.start(); + barrier.await(); + barrier.await(); + // one last update to ensure all listeners are notified + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + for (final AtomicBoolean invocation : invocations) { + assertTrue(invocation.get()); + } + updatingThread.join(); + listenersThread.join(); + } + + private void awaitQuietly(CyclicBarrier barrier) { + try { + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + } + +} \ No newline at end of file diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index bda6de8aa7d61..75d9a22444c1f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -91,6 +91,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -101,11 +102,13 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -729,4 +732,29 @@ public void testPendingRefreshWithIntervalChange() throws InterruptedException { assertTrue(shard.isSearchIdle()); assertHitCount(client().prepareSearch().get(), 3); } + + public void testGlobalCheckpointListeners() throws IOException { + createIndex("test", Settings.builder().put("index.number_of_replicas", 0).build()); + ensureGreen(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + final int numberOfUpdates = randomIntBetween(1, 128); + for (int i = 0; i < numberOfUpdates; i++) { + final AtomicLong globalCheckpoint = new AtomicLong(); + shard.addGlobalCheckpointListener((g, e) -> globalCheckpoint.set(g)); + client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + assertThat(globalCheckpoint.get(), equalTo((long) i)); + } + final AtomicBoolean invoked = new AtomicBoolean(); + shard.addGlobalCheckpointListener((g, e) -> { + invoked.set(true); + assert g == UNASSIGNED_SEQ_NO; + assert e != null; + assertThat(e.getShardId(), equalTo(shard.shardId())); + }); + shard.close("closed", randomBoolean()); + assertTrue(invoked.get()); + } + } From 2d3121237e88a5a0ecca8ab567a5bb9f60cde110 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 22:55:22 -0400 Subject: [PATCH 02/20] Formatting --- .../index/shard/GlobalCheckpointListenersTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 9f8f8a4328bb1..f0f99bcbc1b25 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -296,4 +296,4 @@ private void awaitQuietly(CyclicBarrier barrier) { } } -} \ No newline at end of file +} From ec2f4582e648af303879abb1281a0d501f94a2f3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 22:55:41 -0400 Subject: [PATCH 03/20] Imports --- .../test/java/org/elasticsearch/index/shard/IndexShardIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 75d9a22444c1f..cb1bdea05f718 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -108,7 +108,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; From 06db24170997e568315b2130ee2d4e36840100d7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 22:57:29 -0400 Subject: [PATCH 04/20] Javadocs --- .../index/shard/GlobalCheckpointListeners.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index d7fc30f9600c2..14e8e888765b1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -62,12 +62,24 @@ public interface GlobalCheckpointListener { private final Executor executor; private final Logger logger; + /** + * Construct a global checkpoint listeners collection. + * + * @param shardId the shard ID on which global checkpoint updates can be listened to + * @param executor the executor for listener notifications + * @param logger a shard-level logger + */ GlobalCheckpointListeners(final ShardId shardId, final Executor executor, final Logger logger) { this.shardId = Objects.requireNonNull(shardId); this.executor = Objects.requireNonNull(executor); this.logger = Objects.requireNonNull(logger); } + /** + * Add a global checkpoint listener. + * + * @param listener the listener + */ synchronized void add(final GlobalCheckpointListener listener) { if (closed) { throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"); From eab2a87242d3078569fb5849b7f5555913d61b2d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 23:03:08 -0400 Subject: [PATCH 05/20] Redundant modifiers --- .../elasticsearch/index/shard/GlobalCheckpointListeners.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 14e8e888765b1..7b9ce47d1f022 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -51,7 +51,7 @@ public interface GlobalCheckpointListener { * @param globalCheckpoint the updated global checkpoint * @param e if non-null, the shard is closed */ - void accept(final long globalCheckpoint, final IndexShardClosedException e); + void accept(long globalCheckpoint, IndexShardClosedException e); } // guarded by this From c34a054891176803758dbddce5faf88a6dd4b433 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 23:05:26 -0400 Subject: [PATCH 06/20] Missing modifiers --- .../index/shard/GlobalCheckpointListenersTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index f0f99bcbc1b25..9701d5cd8aefa 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -288,10 +288,10 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio listenersThread.join(); } - private void awaitQuietly(CyclicBarrier barrier) { + private void awaitQuietly(final CyclicBarrier barrier) { try { barrier.await(); - } catch (BrokenBarrierException | InterruptedException e) { + } catch (final BrokenBarrierException | InterruptedException e) { throw new AssertionError(e); } } From a67e6aad36aa33e070b8bbd766c42c20080d91bf Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 23:16:00 -0400 Subject: [PATCH 07/20] Concurrency! --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index cb1bdea05f718..25a0b13952053 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -732,18 +732,19 @@ public void testPendingRefreshWithIntervalChange() throws InterruptedException { assertHitCount(client().prepareSearch().get(), 3); } - public void testGlobalCheckpointListeners() throws IOException { - createIndex("test", Settings.builder().put("index.number_of_replicas", 0).build()); + public void testGlobalCheckpointListeners() throws Exception { + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); ensureGreen(); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); final IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); final int numberOfUpdates = randomIntBetween(1, 128); for (int i = 0; i < numberOfUpdates; i++) { + final int index = i; final AtomicLong globalCheckpoint = new AtomicLong(); shard.addGlobalCheckpointListener((g, e) -> globalCheckpoint.set(g)); client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); - assertThat(globalCheckpoint.get(), equalTo((long) i)); + assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index))); } final AtomicBoolean invoked = new AtomicBoolean(); shard.addGlobalCheckpointListener((g, e) -> { @@ -753,7 +754,7 @@ public void testGlobalCheckpointListeners() throws IOException { assertThat(e.getShardId(), equalTo(shard.shardId())); }); shard.close("closed", randomBoolean()); - assertTrue(invoked.get()); + assertBusy(() -> assertTrue(invoked.get())); } } From f26ce14779dac71310e8200c30490a013b35b418 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 23:17:27 -0400 Subject: [PATCH 08/20] Paranoia assertions --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 25a0b13952053..ac3de2787d5da 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -102,6 +102,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -742,7 +743,11 @@ public void testGlobalCheckpointListeners() throws Exception { for (int i = 0; i < numberOfUpdates; i++) { final int index = i; final AtomicLong globalCheckpoint = new AtomicLong(); - shard.addGlobalCheckpointListener((g, e) -> globalCheckpoint.set(g)); + shard.addGlobalCheckpointListener((g, e) -> { + assert g >= NO_OPS_PERFORMED; + assert e == null; + globalCheckpoint.set(g); + }); client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index))); } From 9e5ce5291649b41d33b4e26cb8f6d6ea7537bd55 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 8 Aug 2018 20:30:20 -0400 Subject: [PATCH 09/20] Add auto-fire for listeners --- .../shard/GlobalCheckpointListeners.java | 36 +++++-- .../elasticsearch/index/shard/IndexShard.java | 16 ++- .../shard/GlobalCheckpointListenersTests.java | 102 +++++++++++++----- .../index/shard/IndexShardIT.java | 36 +++++-- 4 files changed, 141 insertions(+), 49 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 7b9ce47d1f022..ea4657a39e4fa 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.function.LongSupplier; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -59,35 +60,50 @@ public interface GlobalCheckpointListener { private volatile List listeners; private final ShardId shardId; + private final LongSupplier globalCheckpointSupplier; private final Executor executor; private final Logger logger; /** * Construct a global checkpoint listeners collection. * - * @param shardId the shard ID on which global checkpoint updates can be listened to - * @param executor the executor for listener notifications - * @param logger a shard-level logger + * @param shardId the shard ID on which global checkpoint updates can be listened to + * @param globalCheckpointSupplier the global checkpoint supplier + * @param executor the executor for listener notifications + * @param logger a shard-level logger */ - GlobalCheckpointListeners(final ShardId shardId, final Executor executor, final Logger logger) { + GlobalCheckpointListeners( + final ShardId shardId, + final LongSupplier globalCheckpointSupplier, + final Executor executor, + final Logger logger) { this.shardId = Objects.requireNonNull(shardId); + this.globalCheckpointSupplier = Objects.requireNonNull(globalCheckpointSupplier); this.executor = Objects.requireNonNull(executor); this.logger = Objects.requireNonNull(logger); } /** - * Add a global checkpoint listener. + * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the + * listener will fire immediately on the calling thread. * - * @param listener the listener + * @param currentGlobalCheckpoint the current global checkpoint known to the listener + * @param listener the listener */ - synchronized void add(final GlobalCheckpointListener listener) { + synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { if (closed) { throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"); } - if (listeners == null) { - listeners = new ArrayList<>(); + final long globalCheckpoint = globalCheckpointSupplier.getAsLong(); + if (globalCheckpoint > currentGlobalCheckpoint) { + // notify directly + listener.accept(globalCheckpoint, null); + } else { + if (listeners == null) { + listeners = new ArrayList<>(); + } + listeners.add(listener); } - listeners.add(listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 89e0e108a460e..2b6f611dd4792 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -301,7 +301,8 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); - this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger); + this.globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, this::getGlobalCheckpoint, threadPool.executor(ThreadPool.Names.LISTENER), logger); this.replicationTracker = new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated); @@ -1734,8 +1735,17 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } - public void addGlobalCheckpointListener(final GlobalCheckpointListeners.GlobalCheckpointListener listener) { - this.globalCheckpointListeners.add(listener); + /** + * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the + * listener will fire immediately on the calling thread. + * + * @param currentGlobalCheckpoint the current global checkpoint known to the listener + * @param listener the listener + */ + public void addGlobalCheckpointListener( + final long currentGlobalCheckpoint, + final GlobalCheckpointListeners.GlobalCheckpointListener listener) { + this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener); } /** diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 9701d5cd8aefa..e94da313e00a6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -52,23 +52,24 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); - public void testGlobalCheckpointUpdated() { - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + public void testGlobalCheckpointUpdated() throws IOException { + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, logger); final int numberOfListeners = randomIntBetween(0, 16); final long[] globalCheckpoints = new long[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { final int index = i; final AtomicBoolean invoked = new AtomicBoolean(); final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (globalCheckpoint, e) -> { + (g, e) -> { if (invoked.compareAndSet(false, true) == false) { throw new IllegalStateException("listener invoked twice"); } - assert globalCheckpoint != UNASSIGNED_SEQ_NO; + assert g != UNASSIGNED_SEQ_NO; assert e == null; - globalCheckpoints[index] = globalCheckpoint; + globalCheckpoints[index] = g; }; - globalCheckpointListeners.add(listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); } final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -82,10 +83,54 @@ public void testGlobalCheckpointUpdated() { for (int i = 0; i < numberOfListeners; i++) { assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); } + + // closing should also not fire the listeners + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + + public void testListenersReadyToFire() throws IOException { + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> globalCheckpoint, Runnable::run, logger); + final int numberOfListeners = randomIntBetween(0, 16); + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final AtomicBoolean invoked = new AtomicBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (g, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + assert g != UNASSIGNED_SEQ_NO; + assert e == null; + globalCheckpoints[index] = g; + }; + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener); + // the listener should fire immediately + assertThat(globalCheckpoints[index], equalTo(globalCheckpoint)); + } + + // test the listeners are not invoked twice + final long nextGlobalCheckpoint = randomValueOtherThan(globalCheckpoint, () -> randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE)); + globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + + // closing should also not fire the listeners + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } } public void testClose() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, logger); final int numberOfListeners = randomIntBetween(0, 16); final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { @@ -100,7 +145,7 @@ public void testClose() throws IOException { assert e != null; exceptions[index] = e; }; - globalCheckpointListeners.add(listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { @@ -119,10 +164,11 @@ public void testClose() throws IOException { } public void testAddAfterClose() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, logger); globalCheckpointListeners.close(); final IllegalStateException expected = - expectThrows(IllegalStateException.class, () -> globalCheckpointListeners.add(((globalCheckpoint, e) -> {}))); + expectThrows(IllegalStateException.class, () -> globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {})); assertThat( expected, hasToString(containsString("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"))); @@ -130,7 +176,8 @@ public void testAddAfterClose() throws IOException { public void testFailingListenerOnUpdate() { final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, mockLogger); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; final long[] globalCheckpoints = new long[numberOfListeners]; @@ -149,7 +196,7 @@ public void testFailingListenerOnUpdate() { globalCheckpoints[index] = globalCheckpoint; } }; - globalCheckpointListeners.add(listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); } final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -183,7 +230,8 @@ public void testFailingListenerOnUpdate() { public void testFailingListenerOnClose() throws IOException { final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, mockLogger); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; @@ -192,8 +240,8 @@ public void testFailingListenerOnClose() throws IOException { final boolean failure = randomBoolean(); failures[index] = failure; final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (globalCheckpoint, e) -> { - assert globalCheckpoint == UNASSIGNED_SEQ_NO; + (g, e) -> { + assert g == UNASSIGNED_SEQ_NO; assert e != null; if (failure) { throw new RuntimeException("failure"); @@ -201,7 +249,7 @@ public void testFailingListenerOnClose() throws IOException { exceptions[index] = e; } }; - globalCheckpointListeners.add(listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { @@ -234,10 +282,11 @@ public void testNotificationUsesExecutor() { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, executor, logger); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { - globalCheckpointListeners.add(((globalCheckpoint, e) -> {})); + globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {}); } globalCheckpointListeners.globalCheckpointUpdated(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE)); assertThat(count.get(), equalTo(1)); @@ -245,11 +294,11 @@ public void testNotificationUsesExecutor() { public void testConcurrency() throws BrokenBarrierException, InterruptedException { final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); + final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, executor, logger); + new GlobalCheckpointListeners(shardId, globalCheckpoint::get, executor, logger); final CyclicBarrier barrier = new CyclicBarrier(3); final int numberOfIterations = randomIntBetween(1, 1024); - final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); final Thread updatingThread = new Thread(() -> { awaitQuietly(barrier); @@ -265,11 +314,14 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio for (int i = 0; i < numberOfIterations; i++) { final AtomicBoolean invocation = new AtomicBoolean(); invocations.add(invocation); - globalCheckpointListeners.add(((g, e) -> { - if (invocation.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - })); + // sometimes this will fire the listener immediately + globalCheckpointListeners.add( + globalCheckpoint.get(), + (g, e) -> { + if (invocation.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + }); } awaitQuietly(barrier); }); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index ac3de2787d5da..088e24633fe36 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -743,21 +743,35 @@ public void testGlobalCheckpointListeners() throws Exception { for (int i = 0; i < numberOfUpdates; i++) { final int index = i; final AtomicLong globalCheckpoint = new AtomicLong(); - shard.addGlobalCheckpointListener((g, e) -> { - assert g >= NO_OPS_PERFORMED; - assert e == null; - globalCheckpoint.set(g); - }); + shard.addGlobalCheckpointListener( + i - 1, + (g, e) -> { + assert g >= NO_OPS_PERFORMED; + assert e == null; + globalCheckpoint.set(g); + }); client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index))); + // adding a listener expecting a lower global checkpoint should fire immediately + final AtomicLong immediateGlobalCheckpint = new AtomicLong(); + shard.addGlobalCheckpointListener( + randomLongBetween(NO_OPS_PERFORMED, i - 1), + (g, e) -> { + assert g >= NO_OPS_PERFORMED; + assert e == null; + immediateGlobalCheckpint.set(g); + }); + assertThat(immediateGlobalCheckpint.get(), equalTo((long) index)); } final AtomicBoolean invoked = new AtomicBoolean(); - shard.addGlobalCheckpointListener((g, e) -> { - invoked.set(true); - assert g == UNASSIGNED_SEQ_NO; - assert e != null; - assertThat(e.getShardId(), equalTo(shard.shardId())); - }); + shard.addGlobalCheckpointListener( + numberOfUpdates - 1, + (g, e) -> { + invoked.set(true); + assert g == UNASSIGNED_SEQ_NO; + assert e != null; + assertThat(e.getShardId(), equalTo(shard.shardId())); + }); shard.close("closed", randomBoolean()); assertBusy(() -> assertTrue(invoked.get())); } From 30a16f6617962cb9723fd26fd61cd575342b9435 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 9 Aug 2018 08:08:50 -0700 Subject: [PATCH 10/20] Fork immeidate notifies, add test --- .../shard/GlobalCheckpointListeners.java | 32 +++++----- .../shard/GlobalCheckpointListenersTests.java | 60 ++++++++++++++++--- 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index ea4657a39e4fa..2233696bdaa1c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -97,7 +97,7 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint final long globalCheckpoint = globalCheckpointSupplier.getAsLong(); if (globalCheckpoint > currentGlobalCheckpoint) { // notify directly - listener.accept(globalCheckpoint, null); + executor.execute(() -> notifyListener(listener, globalCheckpoint, null)); } else { if (listeners == null) { listeners = new ArrayList<>(); @@ -135,23 +135,27 @@ private void notifyListeners(final long globalCheckpoint, final IndexShardClosed if (currentListeners != null) { executor.execute(() -> { for (final GlobalCheckpointListener listener : currentListeners) { - try { - listener.accept(globalCheckpoint, e); - } catch (final Exception caught) { - if (globalCheckpoint != UNASSIGNED_SEQ_NO) { - logger.warn( - new ParameterizedMessage( - "error notifying global checkpoint listener of updated global checkpoint [{}]", - globalCheckpoint), - caught); - } else { - logger.warn("error notifying global checkpoint listener of closed shard", caught); - } - } + notifyListener(listener, globalCheckpoint, e); } }); } } } + private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final IndexShardClosedException e) { + try { + listener.accept(globalCheckpoint, e); + } catch (final Exception caught) { + if (globalCheckpoint != UNASSIGNED_SEQ_NO) { + logger.warn( + new ParameterizedMessage( + "error notifying global checkpoint listener of updated global checkpoint [{}]", + globalCheckpoint), + caught); + } else { + logger.warn("error notifying global checkpoint listener of closed shard", caught); + } + } + } + } diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index e94da313e00a6..c53f8d61debe3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -44,6 +44,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -84,14 +85,14 @@ public void testGlobalCheckpointUpdated() throws IOException { assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); } - // closing should also not fire the listeners + // closing should also not notify the listeners globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); } } - public void testListenersReadyToFire() throws IOException { + public void testListenersReadyToBeNotified() throws IOException { final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, () -> globalCheckpoint, Runnable::run, logger); @@ -110,7 +111,7 @@ public void testListenersReadyToFire() throws IOException { globalCheckpoints[index] = g; }; globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener); - // the listener should fire immediately + // the listener should be notified immediately assertThat(globalCheckpoints[index], equalTo(globalCheckpoint)); } @@ -121,13 +122,56 @@ public void testListenersReadyToFire() throws IOException { assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); } - // closing should also not fire the listeners + // closing should also not notify the listeners globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); } } + public void testFailingListenerReadyToBeNotified() { + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, () -> globalCheckpoint, Runnable::run, mockLogger); + final int numberOfListeners = randomIntBetween(0, 16); + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final boolean failure = randomBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (g, e) -> { + assert globalCheckpoint != UNASSIGNED_SEQ_NO; + assert e == null; + if (failure) { + globalCheckpoints[index] = Long.MIN_VALUE; + throw new RuntimeException("failure"); + } else { + globalCheckpoints[index] = globalCheckpoint; + } + }; + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener); + // the listener should be notified immediately + if (failure) { + assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); + final ArgumentCaptor message = ArgumentCaptor.forClass(ParameterizedMessage.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(RuntimeException.class); + verify(mockLogger).warn(message.capture(), t.capture()); + reset(mockLogger); + assertThat( + message.getValue().getFormat(), + equalTo("error notifying global checkpoint listener of updated global checkpoint [{}]")); + assertNotNull(message.getValue().getParameters()); + assertThat(message.getValue().getParameters().length, equalTo(1)); + assertThat(message.getValue().getParameters()[0], equalTo(globalCheckpoint)); + assertNotNull(t.getValue()); + assertThat(t.getValue().getMessage(), equalTo("failure")); + } else { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + } + public void testClose() throws IOException { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, logger); @@ -186,14 +230,14 @@ public void testFailingListenerOnUpdate() { final boolean failure = randomBoolean(); failures[index] = failure; final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (globalCheckpoint, e) -> { - assert globalCheckpoint != UNASSIGNED_SEQ_NO; + (g, e) -> { + assert g != UNASSIGNED_SEQ_NO; assert e == null; if (failure) { globalCheckpoints[index] = Long.MIN_VALUE; throw new RuntimeException("failure"); } else { - globalCheckpoints[index] = globalCheckpoint; + globalCheckpoints[index] = g; } }; globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); @@ -314,7 +358,7 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio for (int i = 0; i < numberOfIterations; i++) { final AtomicBoolean invocation = new AtomicBoolean(); invocations.add(invocation); - // sometimes this will fire the listener immediately + // sometimes this will notify the listener immediately globalCheckpointListeners.add( globalCheckpoint.get(), (g, e) -> { From 8ac2ed509a02dc1e2a8f6d4e4a793fbd165dc18b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 10 Aug 2018 11:29:53 -0700 Subject: [PATCH 11/20] Remove supplier --- .../shard/GlobalCheckpointListeners.java | 17 ++++---- .../elasticsearch/index/shard/IndexShard.java | 3 +- .../shard/GlobalCheckpointListenersTests.java | 39 +++++++++---------- .../index/shard/IndexShardIT.java | 2 +- 4 files changed, 28 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 2233696bdaa1c..c04ca2030078b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -58,27 +58,24 @@ public interface GlobalCheckpointListener { // guarded by this private boolean closed; private volatile List listeners; + private volatile long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; private final ShardId shardId; - private final LongSupplier globalCheckpointSupplier; private final Executor executor; private final Logger logger; /** * Construct a global checkpoint listeners collection. * - * @param shardId the shard ID on which global checkpoint updates can be listened to - * @param globalCheckpointSupplier the global checkpoint supplier - * @param executor the executor for listener notifications - * @param logger a shard-level logger + * @param shardId the shard ID on which global checkpoint updates can be listened to + * @param executor the executor for listener notifications + * @param logger a shard-level logger */ GlobalCheckpointListeners( final ShardId shardId, - final LongSupplier globalCheckpointSupplier, final Executor executor, final Logger logger) { this.shardId = Objects.requireNonNull(shardId); - this.globalCheckpointSupplier = Objects.requireNonNull(globalCheckpointSupplier); this.executor = Objects.requireNonNull(executor); this.logger = Objects.requireNonNull(logger); } @@ -94,10 +91,9 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint if (closed) { throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"); } - final long globalCheckpoint = globalCheckpointSupplier.getAsLong(); - if (globalCheckpoint > currentGlobalCheckpoint) { + if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { // notify directly - executor.execute(() -> notifyListener(listener, globalCheckpoint, null)); + executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); } else { if (listeners == null) { listeners = new ArrayList<>(); @@ -121,6 +117,7 @@ public void close() throws IOException { */ void globalCheckpointUpdated(final long globalCheckpoint) { assert globalCheckpoint >= NO_OPS_PERFORMED; + lastKnownGlobalCheckpoint = globalCheckpoint; notifyListeners(globalCheckpoint, null); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2b6f611dd4792..ffce0e6ea8bec 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -301,8 +301,7 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); - this.globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, this::getGlobalCheckpoint, threadPool.executor(ThreadPool.Names.LISTENER), logger); + this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger); this.replicationTracker = new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated); diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index c53f8d61debe3..51459209009fa 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -48,14 +48,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; - public class GlobalCheckpointListenersTests extends ESTestCase { final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); public void testGlobalCheckpointUpdated() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final long[] globalCheckpoints = new long[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { @@ -93,9 +92,9 @@ public void testGlobalCheckpointUpdated() throws IOException { } public void testListenersReadyToBeNotified() throws IOException { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> globalCheckpoint, Runnable::run, logger); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); final long[] globalCheckpoints = new long[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { @@ -130,10 +129,10 @@ public void testListenersReadyToBeNotified() throws IOException { } public void testFailingListenerReadyToBeNotified() { - final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> globalCheckpoint, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); final long[] globalCheckpoints = new long[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { @@ -173,8 +172,8 @@ public void testFailingListenerReadyToBeNotified() { } public void testClose() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { @@ -208,8 +207,8 @@ public void testClose() throws IOException { } public void testAddAfterClose() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); globalCheckpointListeners.close(); final IllegalStateException expected = expectThrows(IllegalStateException.class, () -> globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {})); @@ -220,8 +219,8 @@ public void testAddAfterClose() throws IOException { public void testFailingListenerOnUpdate() { final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; final long[] globalCheckpoints = new long[numberOfListeners]; @@ -274,8 +273,8 @@ public void testFailingListenerOnUpdate() { public void testFailingListenerOnClose() throws IOException { final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; @@ -326,8 +325,8 @@ public void testNotificationUsesExecutor() { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, () -> NO_OPS_PERFORMED, executor, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {}); @@ -338,9 +337,9 @@ public void testNotificationUsesExecutor() { public void testConcurrency() throws BrokenBarrierException, InterruptedException { final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); - final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, globalCheckpoint::get, executor, logger); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); final CyclicBarrier barrier = new CyclicBarrier(3); final int numberOfIterations = randomIntBetween(1, 1024); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 088e24633fe36..182747e7dda5d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -761,7 +761,7 @@ public void testGlobalCheckpointListeners() throws Exception { assert e == null; immediateGlobalCheckpint.set(g); }); - assertThat(immediateGlobalCheckpint.get(), equalTo((long) index)); + assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index))); } final AtomicBoolean invoked = new AtomicBoolean(); shard.addGlobalCheckpointListener( From 22d13a818151356cf4f19b96c494aca479ebcbf9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 12 Aug 2018 18:16:26 -0700 Subject: [PATCH 12/20] Fix sync, fix comment --- .../index/shard/GlobalCheckpointListeners.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index c04ca2030078b..1707e7712a042 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -58,7 +58,7 @@ public interface GlobalCheckpointListener { // guarded by this private boolean closed; private volatile List listeners; - private volatile long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; + private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; private final ShardId shardId; private final Executor executor; @@ -82,7 +82,7 @@ public interface GlobalCheckpointListener { /** * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will fire immediately on the calling thread. + * listener will be asynchronously fired on the executor used to construct this collection of global checkpoint listeners. * * @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param listener the listener @@ -117,7 +117,9 @@ public void close() throws IOException { */ void globalCheckpointUpdated(final long globalCheckpoint) { assert globalCheckpoint >= NO_OPS_PERFORMED; - lastKnownGlobalCheckpoint = globalCheckpoint; + synchronized (this) { + lastKnownGlobalCheckpoint = globalCheckpoint; + } notifyListeners(globalCheckpoint, null); } From dc5d326f83f9fcf09140283ce4408d471f35f807 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 12 Aug 2018 18:40:43 -0700 Subject: [PATCH 13/20] Remove import --- .../org/elasticsearch/index/shard/GlobalCheckpointListeners.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 1707e7712a042..d8f7e4cbdab37 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Executor; -import java.util.function.LongSupplier; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; From ea3c198ca0b66981248df597074f39f3f7719e59 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 13 Aug 2018 08:12:30 -0400 Subject: [PATCH 14/20] Immediately notify listener on close --- .../shard/GlobalCheckpointListeners.java | 6 +++-- .../shard/GlobalCheckpointListenersTests.java | 22 ++++++++++++++----- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index d8f7e4cbdab37..ee30b9d85056c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -81,14 +81,16 @@ public interface GlobalCheckpointListener { /** * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will be asynchronously fired on the executor used to construct this collection of global checkpoint listeners. + * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the + * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global + * checkpoint listeners. * * @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param listener the listener */ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { if (closed) { - throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"); + executor.execute(() -> listener.accept(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); } if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { // notify directly diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 51459209009fa..b64f6c2293e6c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -206,15 +207,24 @@ public void testClose() throws IOException { } } - public void testAddAfterClose() throws IOException { + public void testAddAfterClose() throws InterruptedException, IOException { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); globalCheckpointListeners.close(); - final IllegalStateException expected = - expectThrows(IllegalStateException.class, () -> globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {})); - assertThat( - expected, - hasToString(containsString("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"))); + final AtomicBoolean invoked = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = (g, e) -> { + assert g == UNASSIGNED_SEQ_NO; + assert e != null; + if (invoked.compareAndSet(false, true) == false) { + latch.countDown(); + throw new IllegalStateException("listener invoked twice"); + } + latch.countDown(); + }; + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener); + latch.await(); + assertTrue(invoked.get()); } public void testFailingListenerOnUpdate() { From 7e7727835c3f7342bcd4b07b0a323ea669118932 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 13 Aug 2018 14:18:31 -0400 Subject: [PATCH 15/20] Remove imports --- .../index/shard/GlobalCheckpointListenersTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index b64f6c2293e6c..a5a3c86a91164 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -41,9 +41,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasToString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; From 48089fb2a6ad7356ef7de353da56e6286a00e454 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Aug 2018 10:03:22 -0400 Subject: [PATCH 16/20] Add assertion --- .../elasticsearch/index/shard/GlobalCheckpointListeners.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index ee30b9d85056c..0f62e6a8049c6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -119,6 +119,9 @@ public void close() throws IOException { void globalCheckpointUpdated(final long globalCheckpoint) { assert globalCheckpoint >= NO_OPS_PERFORMED; synchronized (this) { + assert globalCheckpoint > lastKnownGlobalCheckpoint + : "updated global checkpoint [" + globalCheckpoint + "]" + + " is not more than the last known global checkpoint [" + lastKnownGlobalCheckpoint + "]"; lastKnownGlobalCheckpoint = globalCheckpoint; } notifyListeners(globalCheckpoint, null); From 5b635071f94805a3ba3e89c85303e7792efe21bc Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Aug 2018 15:39:30 -0400 Subject: [PATCH 17/20] Update javadoc --- .../elasticsearch/index/shard/GlobalCheckpointListeners.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 0f62e6a8049c6..38c8ee85d8429 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -83,7 +83,8 @@ public interface GlobalCheckpointListener { * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global - * checkpoint listeners. + * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard + * is closed. A listener must re-register after one of these events to receive subsequent events. * * @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param listener the listener From 9533679a44b40a87446f4bacb2188e0ce426e10d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Aug 2018 16:07:45 -0400 Subject: [PATCH 18/20] Fix double notify bug --- .../shard/GlobalCheckpointListeners.java | 8 +++++++- .../shard/GlobalCheckpointListenersTests.java | 20 ++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 38c8ee85d8429..24441743d5c30 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -92,10 +92,12 @@ public interface GlobalCheckpointListener { synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { if (closed) { executor.execute(() -> listener.accept(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); + return; } if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { // notify directly - executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); + executor.execute(() -> listener.accept(lastKnownGlobalCheckpoint, null)); + return; } else { if (listeners == null) { listeners = new ArrayList<>(); @@ -112,6 +114,10 @@ public void close() throws IOException { notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); } + synchronized int pendingListeners() { + return listeners == null ? 0 : listeners.size(); + } + /** * Invoke to notify all registered listeners of an updated global checkpoint. * diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index a5a3c86a91164..6a7e27f8a0cb6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -26,6 +26,7 @@ import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; @@ -350,11 +351,21 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); final CyclicBarrier barrier = new CyclicBarrier(3); final int numberOfIterations = randomIntBetween(1, 1024); - + final AtomicBoolean closed = new AtomicBoolean(); final Thread updatingThread = new Thread(() -> { awaitQuietly(barrier); for (int i = 0; i < numberOfIterations; i++) { - globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + if (rarely() && closed.get() == false) { + closed.set(true); + try { + globalCheckpointListeners.close(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + } + if (closed.get() == false) { + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + } } awaitQuietly(barrier); }); @@ -381,7 +392,10 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio barrier.await(); barrier.await(); // one last update to ensure all listeners are notified - globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + if (closed.get() == false) { + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + } + assertThat(globalCheckpointListeners.pendingListeners(), equalTo(0)); executor.shutdown(); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); for (final AtomicBoolean invocation : invocations) { From 88dee763f97c5051edd110836da03ad5a4c95b21 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Aug 2018 16:26:54 -0400 Subject: [PATCH 19/20] Simple sync --- .../shard/GlobalCheckpointListeners.java | 31 ++++++++----------- .../shard/GlobalCheckpointListenersTests.java | 4 +-- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 24441743d5c30..e279badec4a04 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -91,12 +91,12 @@ public interface GlobalCheckpointListener { */ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { if (closed) { - executor.execute(() -> listener.accept(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); + executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); return; } if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { // notify directly - executor.execute(() -> listener.accept(lastKnownGlobalCheckpoint, null)); + executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); return; } else { if (listeners == null) { @@ -107,10 +107,8 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint } @Override - public void close() throws IOException { - synchronized (this) { - closed = true; - } + public synchronized void close() throws IOException { + closed = true; notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); } @@ -123,25 +121,22 @@ synchronized int pendingListeners() { * * @param globalCheckpoint the updated global checkpoint */ - void globalCheckpointUpdated(final long globalCheckpoint) { + synchronized void globalCheckpointUpdated(final long globalCheckpoint) { assert globalCheckpoint >= NO_OPS_PERFORMED; - synchronized (this) { - assert globalCheckpoint > lastKnownGlobalCheckpoint - : "updated global checkpoint [" + globalCheckpoint + "]" - + " is not more than the last known global checkpoint [" + lastKnownGlobalCheckpoint + "]"; - lastKnownGlobalCheckpoint = globalCheckpoint; - } + assert globalCheckpoint > lastKnownGlobalCheckpoint + : "updated global checkpoint [" + globalCheckpoint + "]" + + " is not more than the last known global checkpoint [" + lastKnownGlobalCheckpoint + "]"; + lastKnownGlobalCheckpoint = globalCheckpoint; notifyListeners(globalCheckpoint, null); } private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { + assert Thread.holdsLock(this); assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); if (listeners != null) { - final List currentListeners; - synchronized (this) { - currentListeners = listeners; - listeners = null; - } + // capture the current listeners + final List currentListeners = listeners; + listeners = null; if (currentListeners != null) { executor.execute(() -> { for (final GlobalCheckpointListener listener : currentListeners) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 6a7e27f8a0cb6..06f11ff4e43db 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -78,7 +78,7 @@ public void testGlobalCheckpointUpdated() throws IOException { } // test the listeners are not invoked twice - final long nextGlobalCheckpoint = randomValueOtherThan(globalCheckpoint, () -> randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE)); + final long nextGlobalCheckpoint = randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); for (int i = 0; i < numberOfListeners; i++) { assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); @@ -115,7 +115,7 @@ public void testListenersReadyToBeNotified() throws IOException { } // test the listeners are not invoked twice - final long nextGlobalCheckpoint = randomValueOtherThan(globalCheckpoint, () -> randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE)); + final long nextGlobalCheckpoint = randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); for (int i = 0; i < numberOfListeners; i++) { assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); From 07dab4ff80965bdeeb36782fa9868e028cc02981 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 15 Aug 2018 08:41:48 -0400 Subject: [PATCH 20/20] Add comment --- .../index/shard/GlobalCheckpointListenersTests.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 06f11ff4e43db..d9240602d8519 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -349,10 +349,12 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); + // we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread final CyclicBarrier barrier = new CyclicBarrier(3); final int numberOfIterations = randomIntBetween(1, 1024); final AtomicBoolean closed = new AtomicBoolean(); final Thread updatingThread = new Thread(() -> { + // synchronize starting with the listener thread and the main test thread awaitQuietly(barrier); for (int i = 0; i < numberOfIterations; i++) { if (rarely() && closed.get() == false) { @@ -367,11 +369,13 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); } } + // synchronize ending with the listener thread and the main test thread awaitQuietly(barrier); }); final List invocations = new CopyOnWriteArrayList<>(); final Thread listenersThread = new Thread(() -> { + // synchronize starting with the updating thread and the main test thread awaitQuietly(barrier); for (int i = 0; i < numberOfIterations; i++) { final AtomicBoolean invocation = new AtomicBoolean(); @@ -385,11 +389,14 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio } }); } + // synchronize ending with the updating thread and the main test thread awaitQuietly(barrier); }); updatingThread.start(); listenersThread.start(); + // synchronize starting with the updating thread and the listener thread barrier.await(); + // synchronize ending with the updating thread and the listener thread barrier.await(); // one last update to ensure all listeners are notified if (closed.get() == false) {