Skip to content

Commit a6c0166

Browse files
committed
Renew retention leases while following (#39335)
This commit is the final piece of the integration of CCR with retention leases. Namely, we periodically renew retention leases and advance the retaining sequence number while following.
1 parent 7b8178c commit a6c0166

File tree

10 files changed

+788
-99
lines changed

10 files changed

+788
-99
lines changed

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,12 +540,16 @@ protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, Acti
540540
new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute();
541541
}
542542

543-
public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
543+
public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
544544
ActionListener<ReplicationResponse> listener) {
545545
return getPrimary().addRetentionLease(id, retainingSequenceNumber, source, listener);
546546
}
547547

548-
public void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
548+
public synchronized RetentionLease renewRetentionLease(String id, long retainingSequenceNumber, String source) {
549+
return getPrimary().renewRetentionLease(id, retainingSequenceNumber, source);
550+
}
551+
552+
public synchronized void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
549553
getPrimary().removeRetentionLease(id, listener);
550554
}
551555

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.support.PlainActionFuture;
1111
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.common.settings.Setting;
1213
import org.elasticsearch.common.unit.TimeValue;
1314
import org.elasticsearch.index.Index;
1415
import org.elasticsearch.index.seqno.RetentionLeaseActions;
@@ -18,11 +19,18 @@
1819

1920
import java.util.Locale;
2021
import java.util.Optional;
21-
22-
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
22+
import java.util.concurrent.TimeUnit;
2323

2424
public class CcrRetentionLeases {
2525

26+
// this setting is intentionally not registered, it is only used in tests
27+
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
28+
Setting.timeSetting(
29+
"index.ccr.retention_lease.renew_interval",
30+
new TimeValue(5, TimeUnit.MINUTES),
31+
new TimeValue(0, TimeUnit.MILLISECONDS),
32+
Setting.Property.NodeScope);
33+
2634
/**
2735
* The retention lease ID used by followers.
2836
*
@@ -52,20 +60,22 @@ public static String retentionLeaseId(
5260
* Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
5361
* remote client. Note that this method will block up to the specified timeout.
5462
*
55-
* @param leaderShardId the leader shard ID
56-
* @param retentionLeaseId the retention lease ID
57-
* @param remoteClient the remote client on which to execute this request
58-
* @param timeout the timeout
63+
* @param leaderShardId the leader shard ID
64+
* @param retentionLeaseId the retention lease ID
65+
* @param retainingSequenceNumber the retaining sequence number
66+
* @param remoteClient the remote client on which to execute this request
67+
* @param timeout the timeout
5968
* @return an optional exception indicating whether or not the retention lease already exists
6069
*/
6170
public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
6271
final ShardId leaderShardId,
6372
final String retentionLeaseId,
73+
final long retainingSequenceNumber,
6474
final Client remoteClient,
6575
final TimeValue timeout) {
6676
try {
6777
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
68-
asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
78+
asyncAddRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
6979
response.actionGet(timeout);
7080
return Optional.empty();
7181
} catch (final RetentionLeaseAlreadyExistsException e) {
@@ -78,39 +88,43 @@ public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLea
7888
* remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response
7989
* or failure.
8090
*
81-
* @param leaderShardId the leader shard ID
82-
* @param retentionLeaseId the retention lease ID
83-
* @param remoteClient the remote client on which to execute this request
84-
* @param listener the listener
91+
* @param leaderShardId the leader shard ID
92+
* @param retentionLeaseId the retention lease ID
93+
* @param retainingSequenceNumber the retaining sequence number
94+
* @param remoteClient the remote client on which to execute this request
95+
* @param listener the listener
8596
*/
8697
public static void asyncAddRetentionLease(
8798
final ShardId leaderShardId,
8899
final String retentionLeaseId,
100+
final long retainingSequenceNumber,
89101
final Client remoteClient,
90102
final ActionListener<RetentionLeaseActions.Response> listener) {
91103
final RetentionLeaseActions.AddRequest request =
92-
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
104+
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
93105
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
94106
}
95107

96108
/**
97109
* Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given
98110
* remote client. Note that this method will block up to the specified timeout.
99111
*
100-
* @param leaderShardId the leader shard ID
101-
* @param retentionLeaseId the retention lease ID
102-
* @param remoteClient the remote client on which to execute this request
103-
* @param timeout the timeout
112+
* @param leaderShardId the leader shard ID
113+
* @param retentionLeaseId the retention lease ID
114+
* @param retainingSequenceNumber the retaining sequence number
115+
* @param remoteClient the remote client on which to execute this request
116+
* @param timeout the timeout
104117
* @return an optional exception indicating whether or not the retention lease already exists
105118
*/
106119
public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
107120
final ShardId leaderShardId,
108121
final String retentionLeaseId,
122+
final long retainingSequenceNumber,
109123
final Client remoteClient,
110124
final TimeValue timeout) {
111125
try {
112126
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
113-
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
127+
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
114128
response.actionGet(timeout);
115129
return Optional.empty();
116130
} catch (final RetentionLeaseNotFoundException e) {
@@ -123,18 +137,20 @@ public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
123137
* given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
124138
* response or failure.
125139
*
126-
* @param leaderShardId the leader shard ID
127-
* @param retentionLeaseId the retention lease ID
128-
* @param remoteClient the remote client on which to execute this request
129-
* @param listener the listener
140+
* @param leaderShardId the leader shard ID
141+
* @param retentionLeaseId the retention lease ID
142+
* @param retainingSequenceNumber the retaining sequence number
143+
* @param remoteClient the remote client on which to execute this request
144+
* @param listener the listener
130145
*/
131146
public static void asyncRenewRetentionLease(
132147
final ShardId leaderShardId,
133148
final String retentionLeaseId,
149+
final long retainingSequenceNumber,
134150
final Client remoteClient,
135151
final ActionListener<RetentionLeaseActions.Response> listener) {
136152
final RetentionLeaseActions.RenewRequest request =
137-
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
153+
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
138154
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
139155
}
140156

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@
3030
import org.elasticsearch.node.NodeClosedException;
3131
import org.elasticsearch.persistent.AllocatedPersistentTask;
3232
import org.elasticsearch.tasks.TaskId;
33+
import org.elasticsearch.threadpool.Scheduler;
3334
import org.elasticsearch.transport.ConnectTransportException;
34-
import org.elasticsearch.xpack.ccr.Ccr;
3535
import org.elasticsearch.transport.NoSuchRemoteClusterException;
36+
import org.elasticsearch.xpack.ccr.Ccr;
3637
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
3738
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
3839

@@ -94,6 +95,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
9495

9596
private volatile ElasticsearchException fatalException;
9697

98+
private Scheduler.Cancellable renewable;
99+
100+
synchronized Scheduler.Cancellable getRenewable() {
101+
return renewable;
102+
}
103+
97104
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
98105
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
99106
super(id, type, action, description, parentTask, headers);
@@ -121,7 +128,8 @@ void start(
121128
final long followerMaxSeqNo) {
122129
/*
123130
* While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to
124-
* avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock.
131+
* avoid the need to declare these fields as volatile. That is, we are ensuring these fields are always accessed under the same
132+
* lock.
125133
*/
126134
synchronized (this) {
127135
this.followerHistoryUUID = followerHistoryUUID;
@@ -130,6 +138,11 @@ void start(
130138
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
131139
this.followerMaxSeqNo = followerMaxSeqNo;
132140
this.lastRequestedSeqNo = followerGlobalCheckpoint;
141+
renewable = scheduleBackgroundRetentionLeaseRenewal(() -> {
142+
synchronized (ShardFollowNodeTask.this) {
143+
return this.followerGlobalCheckpoint;
144+
}
145+
});
133146
}
134147

135148
// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
@@ -507,8 +520,16 @@ protected abstract void innerSendBulkShardOperationsRequest(String followerHisto
507520
protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
508521
Consumer<Exception> errorHandler);
509522

523+
protected abstract Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint);
524+
510525
@Override
511526
protected void onCancelled() {
527+
synchronized (this) {
528+
if (renewable != null) {
529+
renewable.cancel();
530+
renewable = null;
531+
}
532+
}
512533
markAsCompleted();
513534
}
514535

0 commit comments

Comments
 (0)