Skip to content

Commit 516dcb7

Browse files
committed
Rewrite shard follow node task logic
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind). The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure other than reducing the concurrent reads from the leader shard. This PR has the following changes: * Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner. This allows for better unit testing and makes it easier to add stats. * All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api. This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads will be performed until the number of ops is below that limit. * The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process; instead of relying on a background thread to fetch the leader shard's global checkpoint. * Reading write operations from the leader shard (via shard changes api) is a seperate step then writing the write operations (via bulk shards operations api). Whereas before a read would immediately result into a write. * The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written. * Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask. * Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures. Relates to #30086
1 parent a55f614 commit 516dcb7

17 files changed

+1108
-961
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,12 @@ public static class Request extends ActionRequest {
7171

7272
private String leaderIndex;
7373
private String followIndex;
74-
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
75-
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS;
76-
private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;
74+
private int maxReadSize = ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE;
75+
private int maxConcurrentReads = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READS;
76+
private long processorMaxTranslogBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES;
77+
private int maxWriteSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE;
78+
private int maxConcurrentWrites = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITES;
79+
private int maxBufferSize = ShardFollowNodeTask.DEFAULT_MAX_BUFFER_SIZE;
7780

7881
public String getLeaderIndex() {
7982
return leaderIndex;
@@ -91,23 +94,23 @@ public void setFollowIndex(String followIndex) {
9194
this.followIndex = followIndex;
9295
}
9396

94-
public long getBatchSize() {
95-
return batchSize;
97+
public int getMaxReadSize() {
98+
return maxReadSize;
9699
}
97100

98-
public void setBatchSize(long batchSize) {
99-
if (batchSize < 1) {
100-
throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]");
101+
public void setMaxReadSize(int maxReadSize) {
102+
if (maxReadSize < 1) {
103+
throw new IllegalArgumentException("Illegal batch_size [" + maxReadSize + "]");
101104
}
102105

103-
this.batchSize = batchSize;
106+
this.maxReadSize = maxReadSize;
104107
}
105108

106-
public void setConcurrentProcessors(int concurrentProcessors) {
107-
if (concurrentProcessors < 1) {
109+
public void setMaxConcurrentReads(int maxConcurrentReads) {
110+
if (maxConcurrentReads < 1) {
108111
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
109112
}
110-
this.concurrentProcessors = concurrentProcessors;
113+
this.maxConcurrentReads = maxConcurrentReads;
111114
}
112115

113116
public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
@@ -117,6 +120,39 @@ public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
117120
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
118121
}
119122

123+
public int getMaxWriteSize() {
124+
return maxWriteSize;
125+
}
126+
127+
public void setMaxWriteSize(int maxWriteSize) {
128+
if (maxWriteSize < 1) {
129+
throw new IllegalArgumentException("maxWriteSize must be larger than 0");
130+
}
131+
this.maxWriteSize = maxWriteSize;
132+
}
133+
134+
public int getMaxConcurrentWrites() {
135+
return maxConcurrentWrites;
136+
}
137+
138+
public void setMaxConcurrentWrites(int maxConcurrentWrites) {
139+
if (maxConcurrentWrites < 1) {
140+
throw new IllegalArgumentException("maxConcurrentWrites must be larger than 0");
141+
}
142+
this.maxConcurrentWrites = maxConcurrentWrites;
143+
}
144+
145+
public int getMaxBufferSize() {
146+
return maxBufferSize;
147+
}
148+
149+
public void setMaxBufferSize(int maxBufferSize) {
150+
if (maxBufferSize < 1) {
151+
throw new IllegalArgumentException("maxBufferSize must be larger than 0");
152+
}
153+
this.maxBufferSize = maxBufferSize;
154+
}
155+
120156
@Override
121157
public ActionRequestValidationException validate() {
122158
return null;
@@ -127,36 +163,46 @@ public void readFrom(StreamInput in) throws IOException {
127163
super.readFrom(in);
128164
leaderIndex = in.readString();
129165
followIndex = in.readString();
130-
batchSize = in.readVLong();
131-
concurrentProcessors = in.readVInt();
166+
maxReadSize = in.readVInt();
167+
maxConcurrentReads = in.readVInt();
132168
processorMaxTranslogBytes = in.readVLong();
169+
maxWriteSize = in.readVInt();
170+
maxConcurrentWrites = in.readVInt();
171+
maxBufferSize = in.readVInt();
133172
}
134173

135174
@Override
136175
public void writeTo(StreamOutput out) throws IOException {
137176
super.writeTo(out);
138177
out.writeString(leaderIndex);
139178
out.writeString(followIndex);
140-
out.writeVLong(batchSize);
141-
out.writeVInt(concurrentProcessors);
179+
out.writeVInt(maxReadSize);
180+
out.writeVInt(maxConcurrentReads);
142181
out.writeVLong(processorMaxTranslogBytes);
182+
out.writeVInt(maxWriteSize);
183+
out.writeVInt(maxConcurrentWrites);
184+
out.writeVInt(maxBufferSize);
143185
}
144186

145187
@Override
146188
public boolean equals(Object o) {
147189
if (this == o) return true;
148190
if (o == null || getClass() != o.getClass()) return false;
149191
Request request = (Request) o;
150-
return batchSize == request.batchSize &&
151-
concurrentProcessors == request.concurrentProcessors &&
192+
return maxReadSize == request.maxReadSize &&
193+
maxConcurrentReads == request.maxConcurrentReads &&
152194
processorMaxTranslogBytes == request.processorMaxTranslogBytes &&
195+
maxWriteSize == request.maxWriteSize &&
196+
maxConcurrentWrites == request.maxConcurrentWrites &&
197+
maxBufferSize == request.maxBufferSize &&
153198
Objects.equals(leaderIndex, request.leaderIndex) &&
154199
Objects.equals(followIndex, request.followIndex);
155200
}
156201

157202
@Override
158203
public int hashCode() {
159-
return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes);
204+
return Objects.hash(leaderIndex, followIndex, maxReadSize, maxConcurrentReads, processorMaxTranslogBytes,
205+
maxWriteSize, maxConcurrentWrites, maxBufferSize);
160206
}
161207
}
162208

@@ -254,7 +300,8 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe
254300
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
255301
new ShardId(followIndexMetadata.getIndex(), shardId),
256302
new ShardId(leaderIndexMetadata.getIndex(), shardId),
257-
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
303+
request.maxReadSize, request.maxConcurrentReads, request.processorMaxTranslogBytes,
304+
request.maxWriteSize, request.maxConcurrentWrites, request.maxBufferSize, filteredHeaders);
258305
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
259306
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
260307
@Override

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ public Response newResponse() {
5454
public static class Request extends SingleShardRequest<Request> {
5555

5656
private long minSeqNo;
57-
private long maxSeqNo;
57+
private Long maxSeqNo;
5858
private ShardId shardId;
59-
private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;
59+
private long maxTranslogsBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES;
6060

6161
public Request(ShardId shardId) {
6262
super(shardId.getIndexName());
@@ -78,11 +78,11 @@ public void setMinSeqNo(long minSeqNo) {
7878
this.minSeqNo = minSeqNo;
7979
}
8080

81-
public long getMaxSeqNo() {
81+
public Long getMaxSeqNo() {
8282
return maxSeqNo;
8383
}
8484

85-
public void setMaxSeqNo(long maxSeqNo) {
85+
public void setMaxSeqNo(Long maxSeqNo) {
8686
this.maxSeqNo = maxSeqNo;
8787
}
8888

@@ -100,7 +100,7 @@ public ActionRequestValidationException validate() {
100100
if (minSeqNo < 0) {
101101
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException);
102102
}
103-
if (maxSeqNo < minSeqNo) {
103+
if (maxSeqNo != null && maxSeqNo < minSeqNo) {
104104
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo ["
105105
+ maxSeqNo + "]", validationException);
106106
}
@@ -115,7 +115,7 @@ public ActionRequestValidationException validate() {
115115
public void readFrom(StreamInput in) throws IOException {
116116
super.readFrom(in);
117117
minSeqNo = in.readVLong();
118-
maxSeqNo = in.readVLong();
118+
maxSeqNo = in.readOptionalLong();
119119
shardId = ShardId.readShardId(in);
120120
maxTranslogsBytes = in.readVLong();
121121
}
@@ -124,7 +124,7 @@ public void readFrom(StreamInput in) throws IOException {
124124
public void writeTo(StreamOutput out) throws IOException {
125125
super.writeTo(out);
126126
out.writeVLong(minSeqNo);
127-
out.writeVLong(maxSeqNo);
127+
out.writeOptionalLong(maxSeqNo);
128128
shardId.writeTo(out);
129129
out.writeVLong(maxTranslogsBytes);
130130
}
@@ -136,7 +136,7 @@ public boolean equals(final Object o) {
136136
if (o == null || getClass() != o.getClass()) return false;
137137
final Request request = (Request) o;
138138
return minSeqNo == request.minSeqNo &&
139-
maxSeqNo == request.maxSeqNo &&
139+
Objects.equals(maxSeqNo, request.maxSeqNo) &&
140140
Objects.equals(shardId, request.shardId) &&
141141
maxTranslogsBytes == request.maxTranslogsBytes;
142142
}
@@ -150,20 +150,26 @@ public int hashCode() {
150150
public static final class Response extends ActionResponse {
151151

152152
private long indexMetadataVersion;
153+
private long leaderGlobalCheckpoint;
153154
private Translog.Operation[] operations;
154155

155156
Response() {
156157
}
157158

158-
Response(long indexMetadataVersion, final Translog.Operation[] operations) {
159+
Response(long indexMetadataVersion, long leaderGlobalCheckpoint, final Translog.Operation[] operations) {
159160
this.indexMetadataVersion = indexMetadataVersion;
161+
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
160162
this.operations = operations;
161163
}
162164

163165
public long getIndexMetadataVersion() {
164166
return indexMetadataVersion;
165167
}
166168

169+
public long getLeaderGlobalCheckpoint() {
170+
return leaderGlobalCheckpoint;
171+
}
172+
167173
public Translog.Operation[] getOperations() {
168174
return operations;
169175
}
@@ -172,13 +178,15 @@ public Translog.Operation[] getOperations() {
172178
public void readFrom(final StreamInput in) throws IOException {
173179
super.readFrom(in);
174180
indexMetadataVersion = in.readVLong();
181+
leaderGlobalCheckpoint = in.readZLong();
175182
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
176183
}
177184

178185
@Override
179186
public void writeTo(final StreamOutput out) throws IOException {
180187
super.writeTo(out);
181188
out.writeVLong(indexMetadataVersion);
189+
out.writeZLong(leaderGlobalCheckpoint);
182190
out.writeArray(Translog.Operation::writeOperation, operations);
183191
}
184192

@@ -188,13 +196,15 @@ public boolean equals(final Object o) {
188196
if (o == null || getClass() != o.getClass()) return false;
189197
final Response response = (Response) o;
190198
return indexMetadataVersion == response.indexMetadataVersion &&
191-
Arrays.equals(operations, response.operations);
199+
leaderGlobalCheckpoint == response.leaderGlobalCheckpoint &&
200+
Arrays.equals(operations, response.operations);
192201
}
193202

194203
@Override
195204
public int hashCode() {
196205
int result = 1;
197206
result += Objects.hashCode(indexMetadataVersion);
207+
result += Objects.hashCode(leaderGlobalCheckpoint);
198208
result += Arrays.hashCode(operations);
199209
return result;
200210
}
@@ -222,14 +232,16 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
222232
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
223233
IndexShard indexShard = indexService.getShard(request.getShard().id());
224234
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
235+
225236
// The following shard generates this request based on the global checkpoint on the primary copy on the leader.
226237
// Although this value might not have been synced to all replica copies on the leader, the requesting range
227238
// is guaranteed to be at most the local-checkpoint of any shard copies on the leader.
228-
assert request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" + request.minSeqNo + "]," +
229-
" to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + indexShard.getLocalCheckpoint() + "]";
239+
assert request.maxSeqNo == null || request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" +
240+
request.minSeqNo + "]," + " to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" +
241+
indexShard.getLocalCheckpoint() + "]";
230242
final Translog.Operation[] operations =
231243
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
232-
return new Response(indexMetaDataVersion, operations);
244+
return new Response(indexMetaDataVersion, indexShard.getGlobalCheckpoint(), operations);
233245
}
234246

235247
@Override
@@ -254,14 +266,15 @@ protected Response newResponse() {
254266

255267
private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
256268

257-
static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo,
269+
static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, Long maxSeqNo,
258270
long byteLimit) throws IOException {
259271
if (indexShard.state() != IndexShardState.STARTED) {
260272
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
261273
}
262274
int seenBytes = 0;
263275
final List<Translog.Operation> operations = new ArrayList<>();
264-
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, maxSeqNo, true)) {
276+
long max = maxSeqNo != null ? maxSeqNo : minSeqNo + 1000;
277+
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, max, true)) {
265278
Translog.Operation op;
266279
while ((op = snapshot.next()) != null) {
267280
if (op.getSource() == null) {
@@ -274,6 +287,15 @@ static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long min
274287
break;
275288
}
276289
}
290+
} catch (IllegalStateException e) {
291+
// TODO: handle peek reads better.
292+
// Should this optional upper bound leak into the newLuceneChangesSnapshot(...) method?
293+
if (maxSeqNo != null) {
294+
throw e;
295+
} else if (e.getMessage().contains("prematurely terminated last_seen_seqno") == false) {
296+
// Only fail if there are gaps between the ops.
297+
throw e;
298+
}
277299
}
278300
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
279301
}

0 commit comments

Comments
 (0)