Skip to content

Commit e41eae9

Browse files
authored
Live primary-replica resync (no rollback) (#24841)
Adds a replication task that streams all operations from the primary's global checkpoint to all replicas.
1 parent 44e9c0b commit e41eae9

19 files changed

+1071
-75
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -531,38 +531,6 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
531531
}
532532
}
533533

534-
/** Syncs operation result to the translog or throws a shard not available failure */
535-
private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
536-
final Translog.Location currentLocation) throws Exception {
537-
final Translog.Location location;
538-
if (operationResult.hasFailure()) {
539-
// check if any transient write operation failures should be bubbled up
540-
Exception failure = operationResult.getFailure();
541-
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
542-
if (!TransportActions.isShardNotAvailableException(failure)) {
543-
throw failure;
544-
} else {
545-
location = currentLocation;
546-
}
547-
} else {
548-
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
549-
}
550-
return location;
551-
}
552-
553-
private static Translog.Location locationToSync(Translog.Location current,
554-
Translog.Location next) {
555-
/* here we are moving forward in the translog with each operation. Under the hood this might
556-
* cross translog files which is ok since from the user perspective the translog is like a
557-
* tape where only the highest location needs to be fsynced in order to sync all previous
558-
* locations even though they are not in the same file. When the translog rolls over files
559-
* the previous file is fsynced on after closing if needed.*/
560-
assert next != null : "next operation can't be null";
561-
assert current == null || current.compareTo(next) < 0 :
562-
"translog locations are not increasing";
563-
return next;
564-
}
565-
566534
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
567535
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
568536
MappingUpdatePerformer mappingUpdater) throws Exception {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.resync;
20+
21+
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.index.shard.ShardId;
25+
import org.elasticsearch.index.translog.Translog;
26+
27+
import java.io.IOException;
28+
import java.util.List;
29+
30+
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
31+
32+
private List<Translog.Operation> operations;
33+
34+
ResyncReplicationRequest() {
35+
super();
36+
}
37+
38+
public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
39+
super(shardId);
40+
this.operations = operations;
41+
}
42+
43+
public List<Translog.Operation> getOperations() {
44+
return operations;
45+
}
46+
47+
@Override
48+
public void readFrom(StreamInput in) throws IOException {
49+
super.readFrom(in);
50+
operations = in.readList(Translog.Operation::readType);
51+
}
52+
53+
@Override
54+
public void writeTo(StreamOutput out) throws IOException {
55+
super.writeTo(out);
56+
out.writeList(operations);
57+
}
58+
59+
@Override
60+
public String toString() {
61+
return "TransportResyncReplicationAction.Request{" +
62+
"shardId=" + shardId +
63+
", timeout=" + timeout +
64+
", index='" + index + '\'' +
65+
", ops=" + operations.size() +
66+
"}";
67+
}
68+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.resync;
20+
21+
import org.elasticsearch.action.support.WriteResponse;
22+
import org.elasticsearch.action.support.replication.ReplicationResponse;
23+
24+
public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse {
25+
26+
@Override
27+
public void setForcedRefresh(boolean forcedRefresh) {
28+
// ignore
29+
}
30+
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.resync;
20+
21+
import org.elasticsearch.Version;
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.support.ActionFilters;
24+
import org.elasticsearch.action.support.TransportActions;
25+
import org.elasticsearch.action.support.replication.ReplicationOperation;
26+
import org.elasticsearch.action.support.replication.TransportReplicationAction;
27+
import org.elasticsearch.action.support.replication.TransportWriteAction;
28+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
29+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
30+
import org.elasticsearch.cluster.node.DiscoveryNode;
31+
import org.elasticsearch.cluster.service.ClusterService;
32+
import org.elasticsearch.common.inject.Inject;
33+
import org.elasticsearch.common.settings.Settings;
34+
import org.elasticsearch.index.engine.Engine;
35+
import org.elasticsearch.index.seqno.SequenceNumbersService;
36+
import org.elasticsearch.index.shard.IndexShard;
37+
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
38+
import org.elasticsearch.index.translog.Translog;
39+
import org.elasticsearch.indices.IndicesService;
40+
import org.elasticsearch.tasks.Task;
41+
import org.elasticsearch.threadpool.ThreadPool;
42+
import org.elasticsearch.transport.TransportException;
43+
import org.elasticsearch.transport.TransportResponseHandler;
44+
import org.elasticsearch.transport.TransportService;
45+
46+
import java.util.function.Supplier;
47+
48+
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
49+
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
50+
51+
public static String ACTION_NAME = "indices:admin/seq_no/resync";
52+
53+
@Inject
54+
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
55+
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
56+
ShardStateAction shardStateAction, ActionFilters actionFilters,
57+
IndexNameExpressionResolver indexNameExpressionResolver) {
58+
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
59+
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
60+
}
61+
62+
@Override
63+
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
64+
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
65+
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
66+
// we should never reject resync because of thread pool capacity on primary
67+
transportService.registerRequestHandler(transportPrimaryAction,
68+
() -> new ConcreteShardRequest<>(request),
69+
executor, true, true,
70+
new PrimaryOperationTransportHandler());
71+
transportService.registerRequestHandler(transportReplicaAction,
72+
() -> new ConcreteReplicaRequest<>(replicaRequest),
73+
executor, true, true,
74+
new ReplicaOperationTransportHandler());
75+
}
76+
77+
@Override
78+
protected ResyncReplicationResponse newResponseInstance() {
79+
return new ResyncReplicationResponse();
80+
}
81+
82+
@Override
83+
protected ReplicationOperation.Replicas newReplicasProxy() {
84+
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
85+
return new ReplicasProxy();
86+
}
87+
88+
@Override
89+
protected void sendReplicaRequest(
90+
final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
91+
final DiscoveryNode node,
92+
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
93+
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
94+
super.sendReplicaRequest(replicaRequest, node, listener);
95+
} else {
96+
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
97+
}
98+
}
99+
100+
@Override
101+
protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
102+
ResyncReplicationRequest request, IndexShard primary) throws Exception {
103+
final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
104+
return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
105+
}
106+
107+
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) {
108+
return request;
109+
}
110+
111+
@Override
112+
protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
113+
Translog.Location location = performOnReplica(request, replica);
114+
return new WriteReplicaResult(request, location, null, replica, logger);
115+
}
116+
117+
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
118+
Translog.Location location = null;
119+
for (Translog.Operation operation : request.getOperations()) {
120+
try {
121+
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA,
122+
update -> {
123+
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
124+
"Mappings are not available on the replica yet, triggered update: " + update);
125+
});
126+
location = syncOperationResultOrThrow(operationResult, location);
127+
} catch (Exception e) {
128+
// if its not a failure to be ignored, let it bubble up
129+
if (!TransportActions.isShardNotAvailableException(e)) {
130+
throw e;
131+
}
132+
}
133+
}
134+
return location;
135+
}
136+
137+
@Override
138+
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
139+
ActionListener<ResyncReplicationResponse> listener) {
140+
// skip reroute phase
141+
transportService.sendChildRequest(
142+
clusterService.localNode(),
143+
transportPrimaryAction,
144+
new ConcreteShardRequest<>(request, primaryAllocationId),
145+
parentTask,
146+
transportOptions,
147+
new TransportResponseHandler<ResyncReplicationResponse>() {
148+
@Override
149+
public ResyncReplicationResponse newInstance() {
150+
return newResponseInstance();
151+
}
152+
153+
@Override
154+
public String executor() {
155+
return ThreadPool.Names.SAME;
156+
}
157+
158+
@Override
159+
public void handleResponse(ResyncReplicationResponse response) {
160+
listener.onResponse(response);
161+
}
162+
163+
@Override
164+
public void handleException(TransportException exp) {
165+
final Throwable cause = exp.unwrapCause();
166+
if (TransportActions.isShardNotAvailableException(cause)) {
167+
logger.trace("primary became unavailable during resync, ignoring", exp);
168+
} else {
169+
listener.onFailure(exp);
170+
}
171+
}
172+
});
173+
}
174+
175+
}

0 commit comments

Comments
 (0)