Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -55,6 +56,7 @@
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.UnicastConfiguredHostsResolver;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
Expand Down Expand Up @@ -117,7 +119,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<CoordinatorPublication> currentPublication = Optional.empty();

public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
AllocationService allocationService, MasterService masterService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, UnicastHostsProvider unicastHostsProvider,
ClusterApplier clusterApplier, Random random) {
super(settings);
Expand All @@ -136,7 +138,8 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
Expand Down Expand Up @@ -467,8 +470,7 @@ protected void doStart() {

@Override
public DiscoveryStats stats() {
// TODO implement
return null;
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats());
}

@Override
Expand Down Expand Up @@ -761,8 +763,10 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
getLocalNode() + " should be in published " + clusterState;

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, new ListenableFuture<>(), ackListener,
publishListener);
final PublicationTransportHandler.PublicationContext publicationContext =
publicationHandler.newPublicationContext(clusterChangedEvent);
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
new ListenableFuture<>(), ackListener, publishListener);
currentPublication = Optional.of(publication);

transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
Expand Down Expand Up @@ -885,14 +889,15 @@ class CoordinatorPublication extends Publication {
private final ListenableFuture<Void> localNodeAckEvent;
private final AckListener ackListener;
private final ActionListener<Void> publishListener;
private final PublicationTransportHandler.PublicationContext publicationContext;

// We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
// safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
private final List<Join> receivedJoins = new ArrayList<>();
private boolean receivedJoinsProcessed;

CoordinatorPublication(PublishRequest publishRequest, ListenableFuture<Void> localNodeAckEvent, AckListener ackListener,
ActionListener<Void> publishListener) {
CoordinatorPublication(PublishRequest publishRequest, PublicationTransportHandler.PublicationContext publicationContext,
ListenableFuture<Void> localNodeAckEvent, AckListener ackListener, ActionListener<Void> publishListener) {
super(publishRequest,
new AckListener() {
@Override
Expand All @@ -918,6 +923,7 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
},
transportService.getThreadPool()::relativeTimeInMillis);
this.publishRequest = publishRequest;
this.publicationContext = publicationContext;
this.localNodeAckEvent = localNodeAckEvent;
this.ackListener = ackListener;
this.publishListener = publishListener;
Expand Down Expand Up @@ -1046,7 +1052,7 @@ protected void onMissingJoin(DiscoveryNode discoveryNode) {
@Override
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
publicationHandler.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener));
publicationContext.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener));
}

@Override
Expand Down
Loading