Skip to content

Commit ac164e5

Browse files
committed
Primary-replica resync
1 parent 929194e commit ac164e5

19 files changed

+1020
-74
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -531,38 +531,6 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
531531
}
532532
}
533533

534-
/** Syncs operation result to the translog or throws a shard not available failure */
535-
private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
536-
final Translog.Location currentLocation) throws Exception {
537-
final Translog.Location location;
538-
if (operationResult.hasFailure()) {
539-
// check if any transient write operation failures should be bubbled up
540-
Exception failure = operationResult.getFailure();
541-
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
542-
if (!TransportActions.isShardNotAvailableException(failure)) {
543-
throw failure;
544-
} else {
545-
location = currentLocation;
546-
}
547-
} else {
548-
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
549-
}
550-
return location;
551-
}
552-
553-
private static Translog.Location locationToSync(Translog.Location current,
554-
Translog.Location next) {
555-
/* here we are moving forward in the translog with each operation. Under the hood this might
556-
* cross translog files which is ok since from the user perspective the translog is like a
557-
* tape where only the highest location needs to be fsynced in order to sync all previous
558-
* locations even though they are not in the same file. When the translog rolls over files
559-
* the previous file is fsynced on after closing if needed.*/
560-
assert next != null : "next operation can't be null";
561-
assert current == null || current.compareTo(next) < 0 :
562-
"translog locations are not increasing";
563-
return next;
564-
}
565-
566534
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
567535
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
568536
MappingUpdatePerformer mappingUpdater) throws Exception {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.resync;
20+
21+
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.index.shard.ShardId;
25+
import org.elasticsearch.index.translog.Translog;
26+
27+
import java.io.IOException;
28+
import java.util.List;
29+
30+
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
31+
32+
private List<Translog.Operation> operations;
33+
34+
ResyncReplicationRequest() {
35+
super();
36+
}
37+
38+
public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
39+
super(shardId);
40+
this.operations = operations;
41+
}
42+
43+
public List<Translog.Operation> getOperations() {
44+
return operations;
45+
}
46+
47+
@Override
48+
public void readFrom(StreamInput in) throws IOException {
49+
super.readFrom(in);
50+
operations = in.readList(Translog.Operation::readType);
51+
}
52+
53+
@Override
54+
public void writeTo(StreamOutput out) throws IOException {
55+
super.writeTo(out);
56+
out.writeList(operations);
57+
}
58+
59+
@Override
60+
public String toString() {
61+
return "TransportResyncReplicationAction.Request{" +
62+
"shardId=" + shardId +
63+
", timeout=" + timeout +
64+
", index='" + index + '\'' +
65+
", ops=" + operations.size() +
66+
"}";
67+
}
68+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.resync;
20+
21+
import org.elasticsearch.action.support.WriteResponse;
22+
import org.elasticsearch.action.support.replication.ReplicationResponse;
23+
24+
public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse {
25+
26+
@Override
27+
public void setForcedRefresh(boolean forcedRefresh) {
28+
// ignore
29+
}
30+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.resync;
20+
21+
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.action.support.ActionFilters;
23+
import org.elasticsearch.action.support.TransportActions;
24+
import org.elasticsearch.action.support.replication.ReplicationOperation;
25+
import org.elasticsearch.action.support.replication.TransportReplicationAction;
26+
import org.elasticsearch.action.support.replication.TransportWriteAction;
27+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
28+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
29+
import org.elasticsearch.cluster.service.ClusterService;
30+
import org.elasticsearch.common.inject.Inject;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.xcontent.XContentFactory;
33+
import org.elasticsearch.index.engine.Engine;
34+
import org.elasticsearch.index.mapper.SourceToParse;
35+
import org.elasticsearch.index.shard.IndexShard;
36+
import org.elasticsearch.index.shard.IndexShardState;
37+
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
38+
import org.elasticsearch.index.translog.Translog;
39+
import org.elasticsearch.indices.IndicesService;
40+
import org.elasticsearch.tasks.Task;
41+
import org.elasticsearch.threadpool.ThreadPool;
42+
import org.elasticsearch.transport.TransportException;
43+
import org.elasticsearch.transport.TransportResponseHandler;
44+
import org.elasticsearch.transport.TransportService;
45+
46+
import java.util.function.Supplier;
47+
48+
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
49+
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
50+
51+
public static String ACTION_NAME = "indices:admin/seq_no/resync";
52+
53+
@Inject
54+
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
55+
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
56+
ShardStateAction shardStateAction, ActionFilters actionFilters,
57+
IndexNameExpressionResolver indexNameExpressionResolver) {
58+
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
59+
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
60+
}
61+
62+
@Override
63+
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
64+
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
65+
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
66+
// we should never reject resync because of thread pool capacity on primary
67+
transportService.registerRequestHandler(transportPrimaryAction,
68+
() -> new ConcreteShardRequest<>(request),
69+
executor, true, true,
70+
new PrimaryOperationTransportHandler());
71+
transportService.registerRequestHandler(transportReplicaAction,
72+
() -> new ConcreteReplicaRequest<>(replicaRequest),
73+
executor, true, true,
74+
new ReplicaOperationTransportHandler());
75+
}
76+
77+
@Override
78+
protected ResyncReplicationResponse newResponseInstance() {
79+
return new ResyncReplicationResponse();
80+
}
81+
82+
@Override
83+
protected ReplicationOperation.Replicas newReplicasProxy() {
84+
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
85+
return new ReplicasProxy();
86+
}
87+
88+
@Override
89+
protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
90+
ResyncReplicationRequest request, IndexShard primary) throws Exception {
91+
final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
92+
return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
93+
}
94+
95+
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) {
96+
return request;
97+
}
98+
99+
@Override
100+
protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
101+
Translog.Location location = performOnReplica(request, replica);
102+
return new WriteReplicaResult(request, location, null, replica, logger);
103+
}
104+
105+
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
106+
Translog.Location location = null;
107+
for (Translog.Operation operation : request.getOperations()) {
108+
try {
109+
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA,
110+
update -> {
111+
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
112+
"Mappings are not available on the replica yet, triggered update: " + update);
113+
});
114+
location = syncOperationResultOrThrow(operationResult, location);
115+
} catch (Exception e) {
116+
// if its not a failure to be ignored, let it bubble up
117+
if (!TransportActions.isShardNotAvailableException(e)) {
118+
throw e;
119+
}
120+
}
121+
}
122+
return location;
123+
}
124+
125+
@Override
126+
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
127+
ActionListener<ResyncReplicationResponse> listener) {
128+
// skip reroute phase
129+
transportService.sendChildRequest(
130+
clusterService.localNode(),
131+
transportPrimaryAction,
132+
new ConcreteShardRequest<>(request, primaryAllocationId),
133+
parentTask,
134+
transportOptions,
135+
new TransportResponseHandler<ResyncReplicationResponse>() {
136+
@Override
137+
public ResyncReplicationResponse newInstance() {
138+
return newResponseInstance();
139+
}
140+
141+
@Override
142+
public String executor() {
143+
return ThreadPool.Names.SAME;
144+
}
145+
146+
@Override
147+
public void handleResponse(ResyncReplicationResponse response) {
148+
listener.onResponse(response);
149+
}
150+
151+
@Override
152+
public void handleException(TransportException exp) {
153+
listener.onFailure(exp);
154+
}
155+
});
156+
}
157+
158+
}

0 commit comments

Comments
 (0)