Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -531,38 +531,6 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
}
}

/** Syncs operation result to the translog or throws a shard not available failure */
private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Translog.Location currentLocation) throws Exception {
final Translog.Location location;
if (operationResult.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Exception failure = operationResult.getFailure();
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
} else {
location = currentLocation;
}
} else {
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
}
return location;
}

private static Translog.Location locationToSync(Translog.Location current,
Translog.Location next) {
/* here we are moving forward in the translog with each operation. Under the hood this might
* cross translog files which is ok since from the user perspective the translog is like a
* tape where only the highest location needs to be fsynced in order to sync all previous
* locations even though they are not in the same file. When the translog rolls over files
* the previous file is fsynced on after closing if needed.*/
assert next != null : "next operation can't be null";
assert current == null || current.compareTo(next) < 0 :
"translog locations are not increasing";
return next;
}

/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.resync;

import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.List;

public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {

private List<Translog.Operation> operations;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
super(shardId);
this.operations = operations;
}

public List<Translog.Operation> getOperations() {
return operations;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operations = in.readList(Translog.Operation::readType);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(operations);
}

@Override
public String toString() {
return "TransportResyncReplicationAction.Request{" +
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", ops=" + operations.size() +
"}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.resync;

import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;

public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse {

@Override
public void setForcedRefresh(boolean forcedRefresh) {
// ignore
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.resync;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a BWC layer here, ala GlobalCheckpointSyncAction#sendReplicaRequest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a BWC test will be nice...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BWC test... sigh. A rolling upgrade should cover resyncs (e.g. the existing rolling-upgrade test that we have in qa/rolling-upgrade). For the resync to actually send out a request to a 5.6 node, the test would require 3 nodes (and an index with 2 replicas), which will make the rolling-upgrade test even slower as it is. I'm not sure that's a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #25336 for discusssion

ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {

public static String ACTION_NAME = "indices:admin/seq_no/resync";

@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
}

@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
new PrimaryOperationTransportHandler());
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
}

@Override
protected ResyncReplicationResponse newResponseInstance() {
return new ResyncReplicationResponse();
}

@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
return new ReplicasProxy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do want to fail the shard if it was assigned, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we agreed to handle this in a follow-up. Not failing the shard will only be problematic when index.number_of_replicas is > 1, the resync failed but subsequent index operations are replicated just fine.

}

@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}

@Override
protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
ResyncReplicationRequest request, IndexShard primary) throws Exception {
final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you add this extra empty method? can we clean it up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the tests. This ensures that the ResyncAction in the tests stays in sync with TransportResyncReplicationAction (e.g. if we would add more code there).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I missed that usage.

return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
}

public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) {
return request;
}

@Override
protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult(request, location, null, replica, logger);
}

public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (Translog.Operation operation : request.getOperations()) {
try {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA,
update -> {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + update);
});
location = syncOperationResultOrThrow(operationResult, location);
} catch (Exception e) {
// if its not a failure to be ignored, let it bubble up
if (!TransportActions.isShardNotAvailableException(e)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to let the higher up components deal (i.e., ReplicationOperation) with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied it over from TransportShardBulkAction to ensure that it is consistent with that one. I think it makes sense to let it bubble up. If we change the behavior for TransportShardBulkAction though, it should be a separate follow-up PR.

throw e;
}
}
}
return location;
}

@Override
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
ActionListener<ResyncReplicationResponse> listener) {
// skip reroute phase
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId),
parentTask,
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
@Override
public ResyncReplicationResponse newInstance() {
return newResponseInstance();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ResyncReplicationResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
final Throwable cause = exp.unwrapCause();
if (TransportActions.isShardNotAvailableException(cause)) {
logger.trace("primary became unavailable during resync, ignoring", exp);
} else {
listener.onFailure(exp);
}
}
});
}

}
Loading