Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptions.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]ToXContentToBytes.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]BroadcastOperationRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]BroadcastResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]TransportBroadcastAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]AcknowledgedRequest.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.io.IOException;
import java.util.List;
Expand All @@ -42,11 +44,22 @@ public class BroadcastResponse extends ActionResponse {
public BroadcastResponse() {
}

public BroadcastResponse(int totalShards, int successfulShards, int failedShards, List<? extends ShardOperationFailedException> shardFailures) {
public BroadcastResponse(int totalShards, int successfulShards, int failedShards,
List<? extends ShardOperationFailedException> shardFailures) {
assertNoShardNotAvailableFailures(shardFailures);
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.failedShards = failedShards;
this.shardFailures = shardFailures == null ? EMPTY : shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]);
this.shardFailures = shardFailures == null ? EMPTY :
shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]);
}

private void assertNoShardNotAvailableFailures(List<? extends ShardOperationFailedException> shardFailures) {
if (shardFailures != null) {
for (Object e : shardFailures) {
assert (e instanceof ShardNotFoundException) == false : "expected no ShardNotFoundException failures, but got " + e;
}
}
}

/**
Expand All @@ -70,6 +83,17 @@ public int getFailedShards() {
return failedShards;
}

/**
* The REST status that should be used for the response
*/
public RestStatus getStatus() {
if (failedShards > 0) {
return shardFailures[0].status();
} else {
return RestStatus.OK;
}
}

/**
* The list of shard failures exception.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<Str
for (String allocationId : Sets.difference(inSyncAllocationIds, availableAllocationIds)) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStale(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
Expand Down Expand Up @@ -209,14 +209,9 @@ public void onFailure(Exception replicaException) {
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
logger.warn(
(org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] {}", shard.shardId(), message), replicaException);
replicasProxy.failShard(shard, replicaRequest.primaryTerm(), message, replicaException,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
replicasProxy.failShardIfNeeded(shard, replicaRequest.primaryTerm(), message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
}
}
});
Expand Down Expand Up @@ -314,10 +309,13 @@ private void finishAsFailed(Exception exception) {
}
}

/**
* An encapsulation of an operation that is to be performed on the primary shard
*/
public interface Primary<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
PrimaryResultT extends PrimaryResult<ReplicaRequest>
RequestT extends ReplicationRequest<RequestT>,
ReplicaRequestT extends ReplicationRequest<ReplicaRequestT>,
PrimaryResultT extends PrimaryResult<ReplicaRequestT>
> {

/**
Expand All @@ -338,7 +336,7 @@ public interface Primary<
* @param request the request to perform
* @return the request to send to the repicas
*/
PrimaryResultT perform(Request request) throws Exception;
PrimaryResultT perform(RequestT request) throws Exception;


/**
Expand All @@ -355,7 +353,10 @@ public interface Primary<
long localCheckpoint();
}

public interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {
/**
* An encapsulation of an operation that will be executed on the replica shards, if present.
*/
public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {

/**
* performs the the given request on the specified replica
Expand All @@ -364,24 +365,29 @@ public interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaReque
* @param replicaRequest operation to peform
* @param listener a callback to call once the operation has been complicated, either successfully or with an error.
*/
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<ReplicaResponse> listener);
void performOn(ShardRouting replica, RequestT replicaRequest, ActionListener<ReplicaResponse> listener);

/**
* Fail the specified shard, removing it from the current set of active shards
* Fail the specified shard if needed, removing it from the current set
* of active shards. Whether a failure is needed is left up to the
* implementation.
*
* @param replica shard to fail
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
*/
void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);

/**
* Marks shard copy as stale, removing its allocation id from the set of in-sync allocation ids.
* Marks shard copy as stale if needed, removing its allocation id from
* the set of in-sync allocation ids. Whether marking as stale is needed
* is left up to the implementation.
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
Expand All @@ -391,8 +397,8 @@ void failShard(ShardRouting replica, long primaryTerm, String message, Exception
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
*/
void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}

/**
Expand Down Expand Up @@ -422,13 +428,13 @@ public RetryOnPrimaryException(StreamInput in) throws IOException {
}
}

public interface PrimaryResult<R extends ReplicationRequest<R>> {
public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {

/**
* @return null if no operation needs to be sent to a replica
* (for example when the operation failed on the primary due to a parsing exception)
*/
@Nullable R replicaRequest();
@Nullable RequestT replicaRequest();

void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ public abstract class TransportReplicationAction<

private final TransportService transportService;
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
private final IndicesService indicesService;
private final ShardStateAction shardStateAction;
private final TransportRequestOptions transportOptions;
private final String executor;

// package private for testing
private final String transportReplicaAction;
private final String transportPrimaryAction;
private final ReplicasProxy replicasProxy;
private final ReplicationOperation.Replicas replicasProxy;

protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove the shard state action here, no?

Expand Down Expand Up @@ -135,7 +135,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans

this.transportOptions = transportOptions();

this.replicasProxy = new ReplicasProxy();
this.replicasProxy = newReplicasProxy();
}

@Override
Expand All @@ -148,6 +148,10 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new ReroutePhase((ReplicationTask) task, request, listener).run();
}

protected ReplicationOperation.Replicas newReplicasProxy() {
return new ReplicasProxy();
}

protected abstract Response newResponseInstance();

/**
Expand Down Expand Up @@ -369,8 +373,7 @@ protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaReq
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName
);
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName);
}
}

Expand Down Expand Up @@ -1030,7 +1033,13 @@ public String allocationId() {
}
}

final class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
/**
* The {@code ReplicasProxy} is an implementation of the {@code Replicas}
* interface that performs the actual {@code ReplicaRequest} on the replica
* shards. It also encapsulates the logic required for failing the replica
* if deemed necessary as well as marking it as stale when needed.
*/
class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {

@Override
public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener<ReplicationOperation.ReplicaResponse> listener) {
Expand All @@ -1041,45 +1050,28 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen
return;
}
final ConcreteShardRequest<ReplicaRequest> concreteShardRequest =
new ConcreteShardRequest<>(request, replica.allocationId().getId());
new ConcreteShardRequest<>(request, replica.allocationId().getId());
sendReplicaRequest(concreteShardRequest, node, listener);
}

@Override
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
// This does not need to fail the shard. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to fail.
onSuccess.run();
}

@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}

private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}

@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
// This does not need to make the shard stale. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to be marked as stale.
onSuccess.run();
}
}

Expand Down
Loading