Skip to content

Commit b857b31

Browse files
authored
Add BWC layer to seq no infra and enable BWC tests (#22185)
Sequence BWC logic consists of two elements: 1) Wire level BWC using stream versions. 2) A changed to the global checkpoint maintenance semantics. For the sequence number infra to work with a mixed version clusters, we have to consider situation where the primary is on an old node and replicas are on new ones (i.e., the replicas will receive operations without seq#) and also the reverse (i.e., the primary sends operations to a replica but the replica can't process the seq# and respond with local checkpoint). An new primary with an old replica is a rare because we do not allow a replica to recover from a new primary. However, it can occur if the old primary failed and a new replica was promoted or during primary relocation where the source primary is treated as a replica until the master starts the target. 1) Old Primary & New Replica - this case is easy as is taken care of by the wire level BWC. All incoming requests will have their seq# set to `UNASSIGNED_SEQ_NO`, which doesn't confuse the local checkpoint logic (keeping it at `NO_OPS_PERFORMED`) 2) New Primary & Old replica - this one is trickier as the global checkpoint service currently takes all in sync replicas into consideration for the global checkpoint calculation. In order to deal with old replicas, we change the semantics to say all *new node* in sync replicas. That means the replicas on old nodes don't count for the global checkpointing. In this state the seq# infra is not fully operational (you can't search on it, because copies may miss it) but it is maintained on shards that can support it. The old replicas will have to go through a file based recovery at some point and will get the seq# information at that point. There is still an edge case where a new primary fails and an old replica takes over. I'lll discuss this one with @ywelsch as I prefer to avoid it completely. This PR also re-enables the BWC tests which were disabled. As such it had to fix any BWC issue that had crept in. Most notably an issue with the removal of the `timestamp` field in #21670. The commit also includes a fix for the default value of the seq number field in replicated write requests (it was 0 but should be -2), that surface some other minor bugs which are fixed as well. Last - I added some debugging tools like more sane node names and forcing replication request to implement a `toString`
1 parent b58bbb9 commit b857b31

File tree

27 files changed

+519
-89
lines changed

27 files changed

+519
-89
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,14 @@ dependency-reduced-pom.xml
3838
# osx stuff
3939
.DS_Store
4040

41+
# default folders in which the create_bwc_index.py expects to find old es versions in
42+
/backwards
43+
/dev-tools/backwards
44+
4145
# needed in case docs build is run...maybe we can configure doc build to generate files under build?
4246
html_docs
4347

4448
# random old stuff that we should look at the necessity of...
4549
/tmp/
46-
backwards/
4750
eclipse-build
4851

buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ class ClusterFormationTasks {
268268
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) {
269269
Map esConfig = [
270270
'cluster.name' : node.clusterName,
271+
'node.name' : "node-" + node.nodeNum,
271272
'pidfile' : node.pidFile,
272273
'path.repo' : "${node.sharedDir}/repo",
273274
'path.shared_data' : "${node.sharedDir}/",

core/src/main/java/org/elasticsearch/action/DocWriteResponse.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.action;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.action.support.WriteRequest;
2223
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
2324
import org.elasticsearch.action.support.WriteResponse;
@@ -214,7 +215,11 @@ public void readFrom(StreamInput in) throws IOException {
214215
type = in.readString();
215216
id = in.readString();
216217
version = in.readZLong();
217-
seqNo = in.readZLong();
218+
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
219+
seqNo = in.readZLong();
220+
} else {
221+
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
222+
}
218223
forcedRefresh = in.readBoolean();
219224
result = Result.readFrom(in);
220225
}
@@ -226,7 +231,9 @@ public void writeTo(StreamOutput out) throws IOException {
226231
out.writeString(type);
227232
out.writeString(id);
228233
out.writeZLong(version);
229-
out.writeZLong(seqNo);
234+
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
235+
out.writeZLong(seqNo);
236+
}
230237
out.writeBoolean(forcedRefresh);
231238
result.writeTo(out);
232239
}

core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,6 @@ public void writeTo(StreamOutput out) throws IOException {
5858

5959
@Override
6060
public String toString() {
61-
return "flush {" + super.toString() + "}";
61+
return "flush {" + shardId + "}";
6262
}
6363
}

core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.stats;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.routing.ShardRouting;
2324
import org.elasticsearch.common.Nullable;
2425
import org.elasticsearch.common.io.stream.StreamInput;
@@ -103,7 +104,9 @@ public void readFrom(StreamInput in) throws IOException {
103104
statePath = in.readString();
104105
dataPath = in.readString();
105106
isCustomDataPath = in.readBoolean();
106-
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
107+
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
108+
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
109+
}
107110
}
108111

109112
@Override
@@ -114,7 +117,9 @@ public void writeTo(StreamOutput out) throws IOException {
114117
out.writeString(statePath);
115118
out.writeString(dataPath);
116119
out.writeBoolean(isCustomDataPath);
117-
out.writeOptionalWriteable(seqNoStats);
120+
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
121+
out.writeOptionalWriteable(seqNoStats);
122+
}
118123
}
119124

120125
@Override

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.elasticsearch.index.engine.EngineClosedException;
5151
import org.elasticsearch.index.engine.VersionConflictEngineException;
5252
import org.elasticsearch.index.mapper.MapperParsingException;
53-
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
5453
import org.elasticsearch.index.seqno.SequenceNumbersService;
5554
import org.elasticsearch.index.shard.IndexShard;
5655
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -151,7 +150,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
151150
final long version = indexResult.getVersion();
152151
indexRequest.version(version);
153152
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
154-
indexRequest.seqNo(indexResult.getSeqNo());
153+
indexRequest.setSeqNo(indexResult.getSeqNo());
155154
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
156155
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
157156
indexResult.getVersion(), indexResult.isCreated());
@@ -175,7 +174,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
175174
// update the request with the version so it will go to the replicas
176175
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
177176
deleteRequest.version(deleteResult.getVersion());
178-
deleteRequest.seqNo(deleteResult.getSeqNo());
177+
deleteRequest.setSeqNo(deleteResult.getSeqNo());
179178
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
180179
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
181180
deleteResult.getVersion(), deleteResult.isFound());
@@ -286,7 +285,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
286285
final long version = updateOperationResult.getVersion();
287286
indexRequest.version(version);
288287
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
289-
indexRequest.seqNo(updateOperationResult.getSeqNo());
288+
indexRequest.setSeqNo(updateOperationResult.getSeqNo());
290289
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
291290
}
292291
break;
@@ -297,7 +296,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
297296
// update the request with the version so it will go to the replicas
298297
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
299298
deleteRequest.version(updateOperationResult.getVersion());
300-
deleteRequest.seqNo(updateOperationResult.getSeqNo());
299+
deleteRequest.setSeqNo(updateOperationResult.getSeqNo());
301300
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
302301
}
303302
break;
@@ -349,9 +348,9 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
349348
break;
350349
}
351350
assert (replicaRequest.request() instanceof IndexRequest
352-
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
351+
&& ((IndexRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
353352
(replicaRequest.request() instanceof DeleteRequest
354-
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
353+
&& ((DeleteRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
355354
// successful operation
356355
break; // out of retry loop
357356
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {

core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.delete;
2121

2222
import org.elasticsearch.ExceptionsHelper;
23+
import org.elasticsearch.ResourceAlreadyExistsException;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.RoutingMissingException;
2526
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -39,7 +40,6 @@
3940
import org.elasticsearch.index.engine.Engine;
4041
import org.elasticsearch.index.shard.IndexShard;
4142
import org.elasticsearch.index.shard.ShardId;
42-
import org.elasticsearch.ResourceAlreadyExistsException;
4343
import org.elasticsearch.indices.IndicesService;
4444
import org.elasticsearch.tasks.Task;
4545
import org.elasticsearch.threadpool.ThreadPool;
@@ -125,12 +125,14 @@ protected DeleteResponse newResponseInstance() {
125125
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
126126
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
127127
final DeleteResponse response;
128+
final DeleteRequest replicaRequest;
128129
if (result.hasFailure() == false) {
129130
// update the request with the version so it will go to the replicas
130131
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
131132
request.version(result.getVersion());
132-
request.seqNo(result.getSeqNo());
133+
request.setSeqNo(result.getSeqNo());
133134
assert request.versionType().validateVersionForWrites(request.version());
135+
replicaRequest = request;
134136
response = new DeleteResponse(
135137
primary.shardId(),
136138
request.type(),
@@ -140,8 +142,9 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde
140142
result.isFound());
141143
} else {
142144
response = null;
145+
replicaRequest = null;
143146
}
144-
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
147+
return new WritePrimaryResult(replicaRequest, response, result.getTranslogLocation(), result.getFailure(), primary);
145148
}
146149

147150
@Override
@@ -158,7 +161,7 @@ public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest re
158161

159162
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
160163
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
161-
request.seqNo(), request.primaryTerm(), request.version(), request.versionType());
164+
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
162165
return replica.delete(delete);
163166
}
164167

core/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,10 @@ public void writeTo(StreamOutput out) throws IOException {
524524
out.writeOptionalString(routing);
525525
out.writeOptionalString(parent);
526526
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
527-
out.writeOptionalString(null);
527+
// Serialize a fake timestamp. 5.x expect this value to be set by the #process method so we can't use null.
528+
// On the other hand, indices created on 5.x do not index the timestamp field. Therefore passing a 0 (or any value) for
529+
// the transport layer OK as it will be ignored.
530+
out.writeOptionalString("0");
528531
out.writeOptionalWriteable(null);
529532
}
530533
out.writeBytesReference(source);

core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,19 +165,22 @@ protected IndexResponse newResponseInstance() {
165165
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
166166
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
167167
final IndexResponse response;
168+
final IndexRequest replicaRequest;
168169
if (indexResult.hasFailure() == false) {
169170
// update the version on request so it will happen on the replicas
170171
final long version = indexResult.getVersion();
171172
request.version(version);
172173
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
173-
request.seqNo(indexResult.getSeqNo());
174+
request.setSeqNo(indexResult.getSeqNo());
174175
assert request.versionType().validateVersionForWrites(request.version());
176+
replicaRequest = request;
175177
response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(),
176178
indexResult.getVersion(), indexResult.isCreated());
177179
} else {
178180
response = null;
181+
replicaRequest = null;
179182
}
180-
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
183+
return new WritePrimaryResult(replicaRequest, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
181184
}
182185

183186
@Override
@@ -197,9 +200,9 @@ public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest reque
197200

198201
final Engine.Index operation;
199202
try {
200-
operation = replica.prepareIndexOnReplica(sourceToParse, request.seqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
203+
operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
201204
} catch (MapperParsingException e) {
202-
return new Engine.IndexResult(e, request.version(), request.seqNo());
205+
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
203206
}
204207
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
205208
if (update != null) {
@@ -221,7 +224,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
221224
try {
222225
operation = prepareIndexOperationOnPrimary(request, primary);
223226
} catch (MapperParsingException | IllegalArgumentException e) {
224-
return new Engine.IndexResult(e, request.version(), request.seqNo());
227+
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
225228
}
226229
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
227230
final ShardId shardId = primary.shardId();
@@ -232,12 +235,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
232235
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
233236
} catch (IllegalArgumentException e) {
234237
// throws IAE on conflicts merging dynamic mappings
235-
return new Engine.IndexResult(e, request.version(), request.seqNo());
238+
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
236239
}
237240
try {
238241
operation = prepareIndexOperationOnPrimary(request, primary);
239242
} catch (MapperParsingException | IllegalArgumentException e) {
240-
return new Engine.IndexResult(e, request.version(), request.seqNo());
243+
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
241244
}
242245
update = operation.parsedDoc().dynamicMappingsUpdate();
243246
if (update != null) {

core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,9 @@ public BasicReplicationRequest() {
3737
public BasicReplicationRequest(ShardId shardId) {
3838
super(shardId);
3939
}
40+
41+
@Override
42+
public String toString() {
43+
return "BasicReplicationRequest{" + shardId + "}";
44+
}
4045
}

0 commit comments

Comments
 (0)