Skip to content

Commit a9ae5b6

Browse files
committed
Introduce global checkpoint background sync
It is the exciting return of the global checkpoint background sync. Long, long ago, in snapshot version far, far away we had and only had a global checkpoint background sync. This sync would fire periodically and send the global checkpoint from the primary shard to the replicas so that they could update their local knowledge of the global checkpoint. Later in time, as we sped ahead towards finalizing the initial version of sequence IDs, we realized that we need the global checkpoint updates to be inline. This means that on a replication operation, the primary shard would piggy back the global checkpoint with the replication operation to the replicas. The replicas would update their local knowledge of the global checkpoint and reply with their local checkpoint. However, this could allow the global checkpoint on the primary to advance again and the replicas would fall behind in their local knowledge of the global checkpoint. If another replication operation never fired, then the replicas would be permanently behind. To account for this, we added one more sync that would fire when the primary shard fell idle. However, this has problems: - the shard idle timer defaults to five minutes, a long time to wait for the replicas to learn of the new global checkpoint - if a replica missed the sync, there was no follow-up sync to catch them up - there is an inherent race condition where the primary shard could fall idle mid-operation (after having sent the replication request to the replicas); in this case, there would never be a background sync after the operation completes - tying the global checkpoint sync to the idle timer was never natural To fix this, we add two additional changes for the global checkpoint to be synced to the replicas. The first is that we add a post-operation sync that only fires if there are no operations in flight and there is a lagging replica. This gives us a chance to sync the global checkpoint to the replicas immediately after an operation so that they are always kept up to date. The second is that we add back a global checkpoint background sync that fires on a timer. This timer fires every thirty seconds, and is not configurable (for simplicity). This background sync is smarter than what we had previously in the sense that it only sends a sync if the global checkpoint on at least one replica is lagging that of the primary. When the timer fires, we can compare the global checkpoint on the primary to its knowledge of the global checkpoint on the replicas and only send a sync if there is a shard behind. Relates #26591
1 parent a5ba7c7 commit a9ae5b6

24 files changed

+675
-173
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ task verifyVersions {
175175
* after the backport of the backcompat code is complete.
176176
*/
177177
allprojects {
178-
ext.bwc_tests_enabled = true
178+
ext.bwc_tests_enabled = false
179179
}
180180

181181
task verifyBwcTestsEnabled {

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
package org.elasticsearch.action.support.replication;
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
23+
import org.apache.lucene.store.AlreadyClosedException;
2324
import org.elasticsearch.ElasticsearchException;
25+
import org.elasticsearch.ExceptionsHelper;
2426
import org.elasticsearch.Version;
2527
import org.elasticsearch.action.ActionListener;
2628
import org.elasticsearch.action.ActionListenerResponseHandler;
@@ -55,6 +57,7 @@
5557
import org.elasticsearch.index.IndexService;
5658
import org.elasticsearch.index.seqno.SequenceNumbers;
5759
import org.elasticsearch.index.shard.IndexShard;
60+
import org.elasticsearch.index.shard.IndexShardClosedException;
5861
import org.elasticsearch.index.shard.IndexShardState;
5962
import org.elasticsearch.index.shard.ReplicationGroup;
6063
import org.elasticsearch.index.shard.ShardId;
@@ -108,12 +111,26 @@ public abstract class TransportReplicationAction<
108111
protected final String transportReplicaAction;
109112
protected final String transportPrimaryAction;
110113

114+
private final boolean syncGlobalCheckpointAfterOperation;
115+
111116
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
112117
ClusterService clusterService, IndicesService indicesService,
113118
ThreadPool threadPool, ShardStateAction shardStateAction,
114119
ActionFilters actionFilters,
115120
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
116121
Supplier<ReplicaRequest> replicaRequest, String executor) {
122+
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
123+
indexNameExpressionResolver, request, replicaRequest, executor, false);
124+
}
125+
126+
127+
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
128+
ClusterService clusterService, IndicesService indicesService,
129+
ThreadPool threadPool, ShardStateAction shardStateAction,
130+
ActionFilters actionFilters,
131+
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
132+
Supplier<ReplicaRequest> replicaRequest, String executor,
133+
boolean syncGlobalCheckpointAfterOperation) {
117134
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
118135
this.transportService = transportService;
119136
this.clusterService = clusterService;
@@ -126,6 +143,8 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
126143
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
127144

128145
this.transportOptions = transportOptions();
146+
147+
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
129148
}
130149

131150
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
@@ -150,7 +169,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
150169
new ReroutePhase((ReplicationTask) task, request, listener).run();
151170
}
152171

153-
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
172+
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy(long primaryTerm) {
154173
return new ReplicasProxy(primaryTerm);
155174
}
156175

@@ -359,6 +378,17 @@ private ActionListener<Response> createResponseListener(final PrimaryShardRefere
359378
return new ActionListener<Response>() {
360379
@Override
361380
public void onResponse(Response response) {
381+
if (syncGlobalCheckpointAfterOperation) {
382+
try {
383+
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
384+
} catch (final Exception e) {
385+
// only log non-closed exceptions
386+
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
387+
logger.info("post-operation global checkpoint sync failed", e);
388+
// intentionally swallow, a missed global checkpoint sync should not fail this operation
389+
}
390+
}
391+
}
362392
primaryShardReference.close(); // release shard operation lock before responding to caller
363393
setPhase(replicationTask, "finished");
364394
try {

core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
7171
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
7272
Supplier<ReplicaRequest> replicaRequest, String executor) {
7373
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
74-
indexNameExpressionResolver, request, replicaRequest, executor);
74+
indexNameExpressionResolver, request, replicaRequest, executor, true);
7575
}
7676

7777
/** Syncs operation result to the translog or throws a shard not available failure */

core/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.lucene.util.SetOnce;
2323
import org.elasticsearch.client.Client;
2424
import org.elasticsearch.common.Strings;
25-
import org.elasticsearch.common.TriFunction;
2625
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2726
import org.elasticsearch.common.settings.Setting;
2827
import org.elasticsearch.common.settings.Setting.Property;
@@ -40,6 +39,7 @@
4039
import org.elasticsearch.index.shard.IndexSearcherWrapper;
4140
import org.elasticsearch.index.shard.IndexingOperationListener;
4241
import org.elasticsearch.index.shard.SearchOperationListener;
42+
import org.elasticsearch.index.shard.ShardId;
4343
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
4444
import org.elasticsearch.index.similarity.SimilarityProvider;
4545
import org.elasticsearch.index.similarity.SimilarityService;

core/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@
2525
import org.apache.lucene.store.AlreadyClosedException;
2626
import org.apache.lucene.util.Accountable;
2727
import org.apache.lucene.util.IOUtils;
28+
import org.elasticsearch.action.ActionListener;
2829
import org.elasticsearch.client.Client;
2930
import org.elasticsearch.cluster.metadata.IndexMetaData;
3031
import org.elasticsearch.cluster.routing.ShardRouting;
3132
import org.elasticsearch.common.Nullable;
3233
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
34+
import org.elasticsearch.common.lease.Releasable;
35+
import org.elasticsearch.common.settings.Setting;
36+
import org.elasticsearch.common.settings.Setting.Property;
3337
import org.elasticsearch.common.settings.Settings;
3438
import org.elasticsearch.common.unit.TimeValue;
3539
import org.elasticsearch.common.util.BigArrays;
@@ -82,6 +86,7 @@
8286
import java.util.concurrent.ScheduledFuture;
8387
import java.util.concurrent.TimeUnit;
8488
import java.util.concurrent.atomic.AtomicBoolean;
89+
import java.util.function.Consumer;
8590
import java.util.function.LongSupplier;
8691
import java.util.function.Supplier;
8792

@@ -109,10 +114,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
109114
private final AtomicBoolean closed = new AtomicBoolean(false);
110115
private final AtomicBoolean deleted = new AtomicBoolean(false);
111116
private final IndexSettings indexSettings;
112-
private final List<IndexingOperationListener> indexingOperationListeners;
113117
private final List<SearchOperationListener> searchOperationListeners;
118+
private final List<IndexingOperationListener> indexingOperationListeners;
114119
private volatile AsyncRefreshTask refreshTask;
115120
private volatile AsyncTranslogFSync fsyncTask;
121+
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
116122

117123
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
118124
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@@ -182,11 +188,12 @@ public IndexService(
182188
this.engineFactory = engineFactory;
183189
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
184190
this.searcherWrapper = wrapperFactory.newWrapper(this);
185-
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
186191
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
192+
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
187193
// kick off async ops for the first shard in this index
188194
this.refreshTask = new AsyncRefreshTask(this);
189195
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
196+
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
190197
rescheduleFsyncTask(indexSettings.getTranslogDurability());
191198
}
192199

@@ -268,7 +275,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
268275
}
269276
}
270277
} finally {
271-
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask);
278+
IOUtils.close(
279+
bitsetFilterCache,
280+
indexCache,
281+
indexFieldData,
282+
mapperService,
283+
refreshTask,
284+
fsyncTask,
285+
trimTranslogTask,
286+
globalCheckpointTask);
272287
}
273288
}
274289
}
@@ -293,8 +308,7 @@ private long getAvgShardSizeInBytes() throws IOException {
293308
}
294309
}
295310

296-
public synchronized IndexShard createShard(ShardRouting routing) throws IOException {
297-
final boolean primary = routing.primary();
311+
public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
298312
/*
299313
* TODO: we execute this in parallel but it's a synced method. Yet, we might
300314
* be able to serialize the execution via the cluster state in the future. for now we just
@@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept
365379
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
366380
indexCache, mapperService, similarityService, engineFactory,
367381
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
368-
searchOperationListeners, indexingOperationListeners);
382+
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId));
369383
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
370384
eventListener.afterIndexShardCreated(indexShard);
371385
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
@@ -710,6 +724,44 @@ private void maybeTrimTranslog() {
710724
}
711725
}
712726

727+
private void maybeSyncGlobalCheckpoints() {
728+
for (final IndexShard shard : this.shards.values()) {
729+
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
730+
switch (shard.state()) {
731+
case CLOSED:
732+
case CREATED:
733+
case RECOVERING:
734+
case RELOCATED:
735+
continue;
736+
case POST_RECOVERY:
737+
assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active";
738+
continue;
739+
case STARTED:
740+
try {
741+
shard.acquirePrimaryOperationPermit(
742+
ActionListener.wrap(
743+
releasable -> {
744+
try (Releasable ignored = releasable) {
745+
shard.maybeSyncGlobalCheckpoint("background");
746+
}
747+
},
748+
e -> {
749+
if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) {
750+
logger.info("failed to execute background global checkpoint sync", e);
751+
}
752+
}),
753+
ThreadPool.Names.SAME);
754+
} catch (final AlreadyClosedException | IndexShardClosedException e) {
755+
// the shard was closed concurrently, continue
756+
}
757+
continue;
758+
default:
759+
throw new IllegalStateException("unknown state [" + shard.state() + "]");
760+
}
761+
}
762+
}
763+
}
764+
713765
abstract static class BaseAsyncTask implements Runnable, Closeable {
714766
protected final IndexService indexService;
715767
protected final ThreadPool threadPool;
@@ -877,6 +929,41 @@ public String toString() {
877929
}
878930
}
879931

932+
// this setting is intentionally not registered, it is only used in tests
933+
public static final Setting<TimeValue> GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING =
934+
Setting.timeSetting(
935+
"index.global_checkpoint_sync.interval",
936+
new TimeValue(30, TimeUnit.SECONDS),
937+
new TimeValue(0, TimeUnit.MILLISECONDS),
938+
Property.Dynamic,
939+
Property.IndexScope);
940+
941+
/**
942+
* Background task that syncs the global checkpoint to replicas.
943+
*/
944+
final class AsyncGlobalCheckpointTask extends BaseAsyncTask {
945+
946+
AsyncGlobalCheckpointTask(final IndexService indexService) {
947+
// index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests
948+
super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
949+
}
950+
951+
@Override
952+
protected void runInternal() {
953+
indexService.maybeSyncGlobalCheckpoints();
954+
}
955+
956+
@Override
957+
protected String getThreadPool() {
958+
return ThreadPool.Names.GENERIC;
959+
}
960+
961+
@Override
962+
public String toString() {
963+
return "global_checkpoint_sync";
964+
}
965+
}
966+
880967
AsyncRefreshTask getRefreshTask() { // for tests
881968
return refreshTask;
882969
}
@@ -885,6 +972,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests
885972
return fsyncTask;
886973
}
887974

975+
AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
976+
return globalCheckpointTask;
977+
}
978+
888979
/**
889980
* Clears the caches for the given shard id if the shard is still allocated on this node
890981
*/

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import org.apache.lucene.store.AlreadyClosedException;
23+
import org.elasticsearch.ExceptionsHelper;
2224
import org.elasticsearch.Version;
25+
import org.elasticsearch.action.ActionFuture;
2326
import org.elasticsearch.action.ActionListener;
2427
import org.elasticsearch.action.support.ActionFilters;
2528
import org.elasticsearch.action.support.replication.ReplicationOperation;
@@ -34,6 +37,7 @@
3437
import org.elasticsearch.common.settings.Settings;
3538
import org.elasticsearch.index.shard.IndexEventListener;
3639
import org.elasticsearch.index.shard.IndexShard;
40+
import org.elasticsearch.index.shard.IndexShardClosedException;
3741
import org.elasticsearch.index.shard.ShardId;
3842
import org.elasticsearch.indices.IndicesService;
3943
import org.elasticsearch.threadpool.ThreadPool;
@@ -47,7 +51,7 @@
4751
public class GlobalCheckpointSyncAction extends TransportReplicationAction<
4852
GlobalCheckpointSyncAction.Request,
4953
GlobalCheckpointSyncAction.Request,
50-
ReplicationResponse> implements IndexEventListener {
54+
ReplicationResponse> {
5155

5256
public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync";
5357

@@ -73,7 +77,17 @@ public GlobalCheckpointSyncAction(
7377
indexNameExpressionResolver,
7478
Request::new,
7579
Request::new,
76-
ThreadPool.Names.SAME);
80+
ThreadPool.Names.MANAGEMENT);
81+
}
82+
83+
public void updateGlobalCheckpointForShard(final ShardId shardId) {
84+
execute(
85+
new Request(shardId),
86+
ActionListener.wrap(r -> {}, e -> {
87+
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
88+
logger.info(shardId + " global checkpoint sync failed", e);
89+
}
90+
}));
7791
}
7892

7993
@Override
@@ -94,11 +108,6 @@ protected void sendReplicaRequest(
94108
}
95109
}
96110

97-
@Override
98-
public void onShardInactive(final IndexShard indexShard) {
99-
execute(new Request(indexShard.shardId()));
100-
}
101-
102111
@Override
103112
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
104113
final Request request, final IndexShard indexShard) throws Exception {

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,20 @@ public int hashCode() {
209209
}
210210
}
211211

212-
synchronized ObjectLongMap<String> getGlobalCheckpoints() {
212+
/**
213+
* Get the local knowledge of the global checkpoints for all in-sync allocation IDs.
214+
*
215+
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
216+
*/
217+
synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
213218
assert primaryMode;
214219
assert handoffInProgress == false;
215-
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size());
216-
for (final Map.Entry<String, CheckpointState> cps : checkpoints.entrySet()) {
217-
globalCheckpoints.put(cps.getKey(), cps.getValue().globalCheckpoint);
218-
}
220+
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size
221+
checkpoints
222+
.entrySet()
223+
.stream()
224+
.filter(e -> e.getValue().inSync)
225+
.forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint));
219226
return globalCheckpoints;
220227
}
221228

0 commit comments

Comments
 (0)