Skip to content

Commit 0a69985

Browse files
committed
Merge pull request #17038 from jasontedor/enable_acked
Prepare for enabling acked indexing
2 parents 19567fb + 95feb40 commit 0a69985

File tree

12 files changed

+284
-145
lines changed

12 files changed

+284
-145
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import java.util.List;
8080
import java.util.Locale;
8181
import java.util.Map;
82+
import java.util.concurrent.ConcurrentHashMap;
8283
import java.util.concurrent.ConcurrentMap;
8384
import java.util.concurrent.atomic.AtomicBoolean;
8485
import java.util.concurrent.atomic.AtomicInteger;
@@ -679,7 +680,7 @@ protected void doRun() throws Exception {
679680
return;
680681
}
681682
// closed in finishAsFailed(e) in the case of error
682-
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
683+
indexShardReference = getIndexShardReferenceOnPrimary(shardId, request);
683684
if (indexShardReference.isRelocated() == false) {
684685
executeLocally();
685686
} else {
@@ -797,7 +798,7 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
797798
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
798799
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}).
799800
*/
800-
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
801+
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
801802
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
802803
IndexShard indexShard = indexService.getShard(shardId.id());
803804
// we may end up here if the cluster state used to route the primary is so stale that the underlying
@@ -816,7 +817,8 @@ protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
816817
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
817818
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
818819
IndexShard indexShard = indexService.getShard(shardId.id());
819-
return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
820+
IndexShardReference ref = IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
821+
return ref;
820822
}
821823

822824
/**
@@ -997,30 +999,38 @@ public void handleException(TransportException exp) {
997999
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
9981000
logger.warn("[{}] {}", exp, shardId, message);
9991001
shardStateAction.shardFailed(
1000-
shard,
1001-
indexShardReference.routingEntry(),
1002-
message,
1003-
exp,
1004-
new ShardStateAction.Listener() {
1005-
@Override
1006-
public void onSuccess() {
1007-
onReplicaFailure(nodeId, exp);
1008-
}
1009-
1010-
@Override
1011-
public void onFailure(Throwable shardFailedError) {
1012-
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
1013-
ShardRouting primaryShard = indexShardReference.routingEntry();
1014-
String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
1015-
// we are no longer the primary, fail ourselves and start over
1016-
indexShardReference.failShard(message, shardFailedError);
1017-
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
1018-
} else {
1019-
assert false : shardFailedError;
1002+
shard,
1003+
indexShardReference.routingEntry(),
1004+
message,
1005+
exp,
1006+
new ShardStateAction.Listener() {
1007+
@Override
1008+
public void onSuccess() {
10201009
onReplicaFailure(nodeId, exp);
10211010
}
1011+
1012+
@Override
1013+
public void onFailure(Throwable shardFailedError) {
1014+
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
1015+
String message = "unknown";
1016+
try {
1017+
ShardRouting primaryShard = indexShardReference.routingEntry();
1018+
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
1019+
// we are no longer the primary, fail ourselves and start over
1020+
indexShardReference.failShard(message, shardFailedError);
1021+
} catch (Throwable t) {
1022+
shardFailedError.addSuppressed(t);
1023+
}
1024+
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
1025+
} else {
1026+
// these can occur if the node is shutting down and are okay
1027+
// any other exception here is not expected and merits investigation
1028+
assert shardFailedError instanceof TransportException ||
1029+
shardFailedError instanceof NodeClosedException : shardFailedError;
1030+
onReplicaFailure(nodeId, exp);
1031+
}
1032+
}
10221033
}
1023-
}
10241034
);
10251035
}
10261036
}
@@ -1108,7 +1118,9 @@ protected boolean shouldExecuteReplication(Settings settings) {
11081118

11091119
interface IndexShardReference extends Releasable {
11101120
boolean isRelocated();
1121+
11111122
void failShard(String reason, @Nullable Throwable e);
1123+
11121124
ShardRouting routingEntry();
11131125

11141126
/** returns the primary term of the current operation */

core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.transport.ConnectTransportException;
5454
import org.elasticsearch.transport.EmptyTransportResponseHandler;
5555
import org.elasticsearch.transport.NodeDisconnectedException;
56+
import org.elasticsearch.transport.RemoteTransportException;
5657
import org.elasticsearch.transport.TransportChannel;
5758
import org.elasticsearch.transport.TransportException;
5859
import org.elasticsearch.transport.TransportRequest;
@@ -111,7 +112,7 @@ public void handleException(TransportException exp) {
111112
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
112113
} else {
113114
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
114-
listener.onFailure(exp.getCause());
115+
listener.onFailure(exp instanceof RemoteTransportException ? exp.getCause() : exp);
115116
}
116117
}
117118
});

core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ synchronized protected void doStart() {
210210
@Override
211211
synchronized protected void doStop() {
212212
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
213+
onGoingTimeout.cancel();
213214
try {
214215
onGoingTimeout.cancel();
215216
onGoingTimeout.listener.onClose();
@@ -218,6 +219,12 @@ synchronized protected void doStop() {
218219
}
219220
}
220221
ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS);
222+
// close timeout listeners that did not have an ongoing timeout
223+
postAppliedListeners
224+
.stream()
225+
.filter(listener -> listener instanceof TimeoutClusterStateListener)
226+
.map(listener -> (TimeoutClusterStateListener)listener)
227+
.forEach(TimeoutClusterStateListener::onClose);
221228
remove(localNodeMasterListeners);
222229
}
223230

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3737
import org.elasticsearch.cluster.service.ClusterService;
3838
import org.elasticsearch.common.Priority;
39+
import org.elasticsearch.common.SuppressForbidden;
3940
import org.elasticsearch.common.component.AbstractLifecycleComponent;
4041
import org.elasticsearch.common.component.Lifecycle;
4142
import org.elasticsearch.common.inject.Inject;
@@ -188,7 +189,14 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
188189
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
189190
this.nodesFD.addListener(new NodeFaultDetectionListener());
190191

191-
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName);
192+
this.publishClusterState =
193+
new PublishClusterStateAction(
194+
settings,
195+
transportService,
196+
clusterService::state,
197+
new NewPendingClusterStateListener(),
198+
discoverySettings,
199+
clusterName);
192200
this.pingService.setPingContextProvider(this);
193201
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
194202

@@ -766,15 +774,24 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
766774
* If the first condition fails we reject the cluster state and throw an error.
767775
* If the second condition fails we ignore the cluster state.
768776
*/
769-
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
777+
@SuppressForbidden(reason = "debug")
778+
public static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
770779
validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState);
771-
if (currentState.supersedes(newClusterState)) {
780+
781+
// reject cluster states that are not new from the same master
782+
if (currentState.supersedes(newClusterState) ||
783+
(newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) {
772784
// if the new state has a smaller version, and it has the same master node, then no need to process it
785+
logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
786+
return true;
787+
}
788+
789+
// reject older cluster states if we are following a master
790+
if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) {
773791
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
774792
return true;
775-
} else {
776-
return false;
777793
}
794+
return false;
778795
}
779796

780797
/**

core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,16 +164,18 @@ public synchronized void markAsProcessed(ClusterState state) {
164164
currentMaster
165165
);
166166
}
167-
} else if (state.supersedes(pendingState) && pendingContext.committed()) {
168-
logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
169-
pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
170-
);
171-
contextsToRemove.add(pendingContext);
172-
pendingContext.listener.onNewClusterStateProcessed();
173167
} else if (pendingState.stateUUID().equals(state.stateUUID())) {
174168
assert pendingContext.committed() : "processed cluster state is not committed " + state;
175169
contextsToRemove.add(pendingContext);
176170
pendingContext.listener.onNewClusterStateProcessed();
171+
} else if (state.version() >= pendingState.version()) {
172+
logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
173+
pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
174+
);
175+
contextsToRemove.add(pendingContext);
176+
if (pendingContext.committed()) {
177+
pendingContext.listener.onNewClusterStateProcessed();
178+
}
177179
}
178180
}
179181
// now ack the processed state

core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
4242
import org.elasticsearch.discovery.Discovery;
4343
import org.elasticsearch.discovery.DiscoverySettings;
44-
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
4544
import org.elasticsearch.discovery.zen.ZenDiscovery;
4645
import org.elasticsearch.threadpool.ThreadPool;
4746
import org.elasticsearch.transport.BytesTransportRequest;
@@ -58,11 +57,13 @@
5857
import java.util.ArrayList;
5958
import java.util.HashMap;
6059
import java.util.HashSet;
60+
import java.util.Locale;
6161
import java.util.Map;
6262
import java.util.Set;
6363
import java.util.concurrent.CountDownLatch;
6464
import java.util.concurrent.TimeUnit;
6565
import java.util.concurrent.atomic.AtomicBoolean;
66+
import java.util.function.Supplier;
6667

6768
/**
6869
*
@@ -81,17 +82,22 @@ public interface NewPendingClusterStateListener {
8182
}
8283

8384
private final TransportService transportService;
84-
private final DiscoveryNodesProvider nodesProvider;
85+
private final Supplier<ClusterState> clusterStateSupplier;
8586
private final NewPendingClusterStateListener newPendingClusterStatelistener;
8687
private final DiscoverySettings discoverySettings;
8788
private final ClusterName clusterName;
8889
private final PendingClusterStatesQueue pendingStatesQueue;
8990

90-
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
91-
NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
91+
public PublishClusterStateAction(
92+
Settings settings,
93+
TransportService transportService,
94+
Supplier<ClusterState> clusterStateSupplier,
95+
NewPendingClusterStateListener listener,
96+
DiscoverySettings discoverySettings,
97+
ClusterName clusterName) {
9298
super(settings);
9399
this.transportService = transportService;
94-
this.nodesProvider = nodesProvider;
100+
this.clusterStateSupplier = clusterStateSupplier;
95101
this.newPendingClusterStatelistener = listener;
96102
this.discoverySettings = discoverySettings;
97103
this.clusterName = clusterName;
@@ -363,7 +369,7 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
363369
final ClusterState incomingState;
364370
// If true we received full cluster state - otherwise diffs
365371
if (in.readBoolean()) {
366-
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
372+
incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
367373
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
368374
} else if (lastSeenClusterState != null) {
369375
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
@@ -394,14 +400,25 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus
394400
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
395401
throw new IllegalStateException("received state from a node that is not part of the cluster");
396402
}
397-
final DiscoveryNodes currentNodes = nodesProvider.nodes();
403+
final ClusterState clusterState = clusterStateSupplier.get();
398404

399-
if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
405+
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
400406
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
401-
throw new IllegalStateException("received state from a node that is not part of the cluster");
407+
throw new IllegalStateException("received state with a local node that does not match the current local node");
408+
}
409+
410+
if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) {
411+
String message = String.format(
412+
Locale.ROOT,
413+
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
414+
incomingState.version(),
415+
incomingState.stateUUID(),
416+
incomingState.nodes().getMasterNodeId()
417+
);
418+
logger.warn(message);
419+
throw new IllegalStateException(message);
402420
}
403421

404-
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
405422
}
406423

407424
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
@@ -518,7 +535,7 @@ public void waitForCommit(TimeValue commitTimeout) {
518535
}
519536

520537
if (timedout) {
521-
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]");
538+
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])");
522539
}
523540
if (isCommitted() == false) {
524541
throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left",

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,9 @@ public boolean isRelocated() {
10621062
@Override
10631063
public void failShard(String reason, @Nullable Throwable e) {
10641064
isShardFailed.set(true);
1065+
if (randomBoolean()) {
1066+
throw new ElasticsearchException("simulated");
1067+
}
10651068
}
10661069

10671070
@Override
@@ -1173,7 +1176,7 @@ protected boolean resolveIndex() {
11731176

11741177

11751178
@Override
1176-
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
1179+
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
11771180
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
11781181
return getOrCreateIndexShardOperationsCounter(indexMetaData.primaryTerm(shardId.id()));
11791182
}

0 commit comments

Comments
 (0)