5252import org .elasticsearch .discovery .zen .fd .MasterFaultDetection ;
5353import org .elasticsearch .discovery .zen .fd .NodesFaultDetection ;
5454import org .elasticsearch .discovery .zen .membership .MembershipAction ;
55+ import org .elasticsearch .discovery .zen .ping .PingContextProvider ;
5556import org .elasticsearch .discovery .zen .ping .ZenPing ;
5657import org .elasticsearch .discovery .zen .ping .ZenPingService ;
5758import org .elasticsearch .discovery .zen .publish .PublishClusterStateAction ;
6970import java .util .concurrent .CopyOnWriteArrayList ;
7071import java .util .concurrent .atomic .AtomicBoolean ;
7172import java .util .concurrent .atomic .AtomicInteger ;
73+ import java .util .concurrent .atomic .AtomicLong ;
7274
7375import static com .google .common .collect .Lists .newArrayList ;
7476import static org .elasticsearch .common .unit .TimeValue .timeValueSeconds ;
7577
7678/**
7779 *
7880 */
79- public class ZenDiscovery extends AbstractLifecycleComponent <Discovery > implements Discovery , DiscoveryNodesProvider {
81+ public class ZenDiscovery extends AbstractLifecycleComponent <Discovery > implements Discovery , PingContextProvider {
8082
8183 public final static String SETTING_REJOIN_ON_MASTER_GONE = "discovery.zen.rejoin_on_master_gone" ;
8284 public final static String SETTING_PING_TIMEOUT = "discovery.zen.ping.timeout" ;
@@ -139,6 +141,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
139141
140142 private volatile boolean rejoinOnMasterGone ;
141143
144+ /** counts the time this node has joined the cluster or have elected it self as master */
145+ private final AtomicLong clusterJoinsCounter = new AtomicLong ();
146+
142147 @ Nullable
143148 private NodeService nodeService ;
144149
@@ -194,7 +199,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
194199 this .nodesFD .addListener (new NodeFaultDetectionListener ());
195200
196201 this .publishClusterState = new PublishClusterStateAction (settings , transportService , this , new NewClusterStateListener (), discoverySettings , clusterName );
197- this .pingService .setNodesProvider (this );
202+ this .pingService .setPingContextProvider (this );
198203 this .membership = new MembershipAction (settings , clusterService , transportService , this , new MembershipListener ());
199204
200205 transportService .registerHandler (DISCOVERY_REJOIN_ACTION_NAME , new RejoinClusterRequestHandler ());
@@ -290,6 +295,7 @@ public String nodeDescription() {
290295 return clusterName .value () + "/" + localNode .id ();
291296 }
292297
298+ /** start of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
293299 @ Override
294300 public DiscoveryNodes nodes () {
295301 DiscoveryNodes latestNodes = this .latestDiscoNodes ;
@@ -305,6 +311,14 @@ public NodeService nodeService() {
305311 return this .nodeService ;
306312 }
307313
314+ @ Override
315+ public boolean nodeHasJoinedClusterOnce () {
316+ return clusterJoinsCounter .get () > 0 ;
317+ }
318+
319+ /** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
320+
321+
308322 @ Override
309323 public void publish (ClusterState clusterState , AckListener ackListener ) {
310324 if (!master ) {
@@ -387,6 +401,8 @@ public void onFailure(String source, Throwable t) {
387401 @ Override
388402 public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
389403 sendInitialStateEventIfNeeded ();
404+ long count = clusterJoinsCounter .incrementAndGet ();
405+ logger .trace ("cluster joins counter set to [{}] (elected as master)" , count );
390406 }
391407 });
392408 } else {
@@ -404,8 +420,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
404420 }
405421
406422 masterFD .start (masterNode , "initial_join" );
407- // no need to submit the received cluster state, we will get it from the master when it publishes
408- // the fact that we joined
423+ long count = clusterJoinsCounter . incrementAndGet ();
424+ logger . trace ( "cluster joins counter set to [{}] ( joined master)" , count );
409425 }
410426 }
411427 }
@@ -922,7 +938,7 @@ private DiscoveryNode findMaster() {
922938 sb .append (" {none}" );
923939 } else {
924940 for (ZenPing .PingResponse pingResponse : fullPingResponses ) {
925- sb .append ("\n \t --> " ).append ("target [" ). append ( pingResponse . target ()). append ( "], master [" ). append ( pingResponse . master ()). append ( "]" );
941+ sb .append ("\n \t --> " ).append (pingResponse );
926942 }
927943 }
928944 logger .trace (sb .toString ());
@@ -931,7 +947,7 @@ private DiscoveryNode findMaster() {
931947 // filter responses
932948 List <ZenPing .PingResponse > pingResponses = Lists .newArrayList ();
933949 for (ZenPing .PingResponse pingResponse : fullPingResponses ) {
934- DiscoveryNode node = pingResponse .target ();
950+ DiscoveryNode node = pingResponse .node ();
935951 if (masterElectionFilterClientNodes && (node .clientNode () || (!node .masterNode () && !node .dataNode ()))) {
936952 // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
937953 } else if (masterElectionFilterDataNodes && (!node .masterNode () && node .dataNode ())) {
@@ -947,7 +963,7 @@ private DiscoveryNode findMaster() {
947963 sb .append (" {none}" );
948964 } else {
949965 for (ZenPing .PingResponse pingResponse : pingResponses ) {
950- sb .append ("\n \t --> " ).append ("target [" ). append ( pingResponse . target ()). append ( "], master [" ). append ( pingResponse . master ()). append ( "]" );
966+ sb .append ("\n \t --> " ).append (pingResponse );
951967 }
952968 }
953969 logger .debug (sb .toString ());
@@ -963,20 +979,38 @@ private DiscoveryNode findMaster() {
963979 }
964980 }
965981
966- Set <DiscoveryNode > possibleMasterNodes = Sets .newHashSet ();
982+ // nodes discovered during pinging
983+ Set <DiscoveryNode > activeNodes = Sets .newHashSet ();
984+ // nodes discovered who has previously been part of the cluster and do not ping for the very first time
985+ Set <DiscoveryNode > joinedOnceActiveNodes = Sets .newHashSet ();
967986 if (localNode .masterNode ()) {
968- possibleMasterNodes .add (localNode );
987+ activeNodes .add (localNode );
988+ long joinsCounter = clusterJoinsCounter .get ();
989+ if (joinsCounter > 0 ) {
990+ logger .trace ("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})" , joinsCounter );
991+ joinedOnceActiveNodes .add (localNode );
992+ }
969993 }
970994 for (ZenPing .PingResponse pingResponse : pingResponses ) {
971- possibleMasterNodes .add (pingResponse .target ());
995+ activeNodes .add (pingResponse .node ());
996+ if (pingResponse .hasJoinedOnce ()) {
997+ joinedOnceActiveNodes .add (pingResponse .node ());
998+ }
972999 }
9731000
9741001 if (pingMasters .isEmpty ()) {
975- // if we don't have enough master nodes, we bail, because there are not enough master to elect from
976- if (electMaster .hasEnoughMasterNodes (possibleMasterNodes )) {
977- return electMaster .electMaster (possibleMasterNodes );
1002+ if (electMaster .hasEnoughMasterNodes (activeNodes )) {
1003+ // we give preference to nodes who have previously already joined the cluster. Those will
1004+ // have a cluster state in memory, including an up to date routing table (which is not persistent to disk
1005+ // by the gateway)
1006+ DiscoveryNode master = electMaster .electMaster (joinedOnceActiveNodes );
1007+ if (master != null ) {
1008+ return master ;
1009+ }
1010+ return electMaster .electMaster (activeNodes );
9781011 } else {
979- logger .trace ("not enough master nodes [{}]" , possibleMasterNodes );
1012+ // if we don't have enough master nodes, we bail, because there are not enough master to elect from
1013+ logger .trace ("not enough master nodes [{}]" , activeNodes );
9801014 return null ;
9811015 }
9821016 } else {
0 commit comments