Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.fd;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;

public abstract class FaultDetection extends AbstractComponent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a javadoc string would be awesome


public static final String SETTING_CONNECT_ON_NETWORK_DISCONNECT = "discovery.zen.fd.connect_on_network_disconnect";
public static final String SETTING_PING_INTERVAL = "discovery.zen.fd.ping_interval";
public static final String SETTING_PING_TIMEOUT = "discovery.zen.fd.ping_timeout";
public static final String SETTING_PING_RETRIES = "discovery.zen.fd.ping_retries";
public static final String SETTING_REGISTER_CONNECTION_LISTENER = "discovery.zen.fd.register_connection_listener";

protected final ThreadPool threadPool;
protected final ClusterName clusterName;
protected final TransportService transportService;

// used mainly for testing, should always be true
protected final boolean registerConnectionListener;
protected final FDConnectionListener connectionListener;
protected final boolean connectOnNetworkDisconnect;

protected final TimeValue pingInterval;
protected final TimeValue pingRetryTimeout;
protected final int pingRetryCount;

public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;

this.connectOnNetworkDisconnect = settings.getAsBoolean(SETTING_CONNECT_ON_NETWORK_DISCONNECT, false);
this.pingInterval = settings.getAsTime(SETTING_PING_INTERVAL, timeValueSeconds(1));
this.pingRetryTimeout = settings.getAsTime(SETTING_PING_TIMEOUT, timeValueSeconds(30));
this.pingRetryCount = settings.getAsInt(SETTING_PING_RETRIES, 3);
this.registerConnectionListener = settings.getAsBoolean(SETTING_REGISTER_CONNECTION_LISTENER, true);

this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}
}

public void close() {
transportService.removeConnectionListener(connectionListener);
}

abstract void handleTransportDisconnect(DiscoveryNode node);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document this method?


private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}

@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -37,13 +36,12 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.transport.TransportRequestOptions.options;

/**
* A fault detection that pings the master periodically to see if its alive.
*/
public class MasterFaultDetection extends AbstractComponent {
public class MasterFaultDetection extends FaultDetection {

public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping";

Expand All @@ -54,31 +52,10 @@ public static interface Listener {
void onDisconnectedFromMaster();
}

private final ThreadPool threadPool;

private final TransportService transportService;

private final DiscoveryNodesProvider nodesProvider;

private final ClusterName clusterName;

private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();


private final boolean connectOnNetworkDisconnect;

private final TimeValue pingInterval;

private final TimeValue pingRetryTimeout;

private final int pingRetryCount;

// used mainly for testing, should always be true
private final boolean registerConnectionListener;


private final FDConnectionListener connectionListener;

private volatile MasterPinger masterPinger;

private final Object masterNodeMutex = new Object();
Expand All @@ -91,25 +68,11 @@ public static interface Listener {

public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
DiscoveryNodesProvider nodesProvider, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
super(settings, threadPool, transportService, clusterName);
this.nodesProvider = nodesProvider;
this.clusterName = clusterName;

this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);

logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);

this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}

transportService.registerHandler(MASTER_PING_ACTION_NAME, new MasterPingRequestHandler());
}

Expand Down Expand Up @@ -188,13 +151,14 @@ private void innerStop() {
}

public void close() {
super.close();
stop("closing");
this.listeners.clear();
transportService.removeConnectionListener(connectionListener);
transportService.removeHandler(MASTER_PING_ACTION_NAME);
}

private void handleTransportDisconnect(DiscoveryNode node) {
@Override
protected void handleTransportDisconnect(DiscoveryNode node) {
synchronized (masterNodeMutex) {
if (!node.equals(this.masterNode)) {
return;
Expand Down Expand Up @@ -245,17 +209,6 @@ public void run() {
}
}

private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}

@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}

private class MasterPinger implements Runnable {

private volatile boolean running = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -37,14 +36,13 @@
import java.util.concurrent.CopyOnWriteArrayList;

import static org.elasticsearch.cluster.node.DiscoveryNodes.EMPTY_NODES;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.transport.TransportRequestOptions.options;

/**
* A fault detection of multiple nodes.
*/
public class NodesFaultDetection extends AbstractComponent {
public class NodesFaultDetection extends FaultDetection {

public static final String PING_ACTION_NAME = "internal:discovery/zen/fd/ping";

Expand All @@ -56,56 +54,22 @@ public void onPingReceived(PingRequest pingRequest) {}

}

private final ThreadPool threadPool;

private final TransportService transportService;
private final ClusterName clusterName;


private final boolean connectOnNetworkDisconnect;

private final TimeValue pingInterval;

private final TimeValue pingRetryTimeout;

private final int pingRetryCount;

// used mainly for testing, should always be true
private final boolean registerConnectionListener;


private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();

private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();

private final FDConnectionListener connectionListener;

private volatile DiscoveryNodes latestNodes = EMPTY_NODES;

private volatile long clusterStateVersion = -1;

private volatile boolean running = false;

public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;

this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);
super(settings, threadPool, transportService, clusterName);

logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);

transportService.registerHandler(PING_ACTION_NAME, new PingRequestHandler());

this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}
}

public void addListener(Listener listener) {
Expand Down Expand Up @@ -157,12 +121,13 @@ public NodesFaultDetection stop() {
}

public void close() {
super.close();
stop();
transportService.removeHandler(PING_ACTION_NAME);
transportService.removeConnectionListener(connectionListener);
}

private void handleTransportDisconnect(DiscoveryNode node) {
@Override
protected void handleTransportDisconnect(DiscoveryNode node) {
if (!latestNodes.nodeExists(node.id())) {
return;
}
Expand Down Expand Up @@ -295,18 +260,6 @@ static class NodeFD {
volatile boolean running = true;
}

private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}

@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}


class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
Expand Down Expand Up @@ -114,8 +115,8 @@ private List<String> startCluster(int numberOfNodes, int minimumMasterNode) thro
}

final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder()
.put("discovery.zen.fd.ping_timeout", "1s") // for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // for hitting simulated network failures quickly
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // for hitting simulated network failures quickly
.put(FaultDetection.SETTING_PING_RETRIES, "1") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.node.service.NodeService;
Expand Down Expand Up @@ -131,7 +132,8 @@ public void testNodesFaultDetectionConnectOnDisconnect() throws InterruptedExcep
ImmutableSettings.Builder settings = ImmutableSettings.builder();
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry)
.put(FaultDetection.SETTING_PING_INTERVAL, "5m");
NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA, new ClusterName("test"));
nodesFD.start();
nodesFD.updateNodes(buildNodesForA(true), -1);
Expand Down Expand Up @@ -165,7 +167,8 @@ public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedExce
ImmutableSettings.Builder settings = ImmutableSettings.builder();
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry)
.put(FaultDetection.SETTING_PING_INTERVAL, "5m");
ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20));
final DiscoveryNodes nodes = buildNodesForA(false);
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

Expand Down Expand Up @@ -54,8 +55,8 @@ public void testChangeRejoinOnMasterOptionIsDynamic() throws Exception {
@Test
public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Exception {
Settings defaultSettings = ImmutableSettings.builder()
.put("discovery.zen.fd.ping_timeout", "1s")
.put("discovery.zen.fd.ping_retries", "1")
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s")
.put(FaultDetection.SETTING_PING_RETRIES, "1")
.put("discovery.type", "zen")
.build();

Expand Down
Loading