Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final AtomicReference<ClusterState> state; // last applied state

private final String nodeName;

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
this.nodeName = nodeName;
}

public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand All @@ -130,7 +133,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(state.get(), "please set initial state before starting");
addListener(localNodeMasterListeners);
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
nodeName() + "/" + CLUSTER_UPDATE_THREAD_NAME,
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
Expand All @@ -60,17 +61,20 @@ public class ClusterService extends AbstractLifecycleComponent {

private final ClusterSettings clusterSettings;

private final String nodeName;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.masterService = new MasterService(nodeName, settings, threadPool);
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
// Add a no-op update consumer so changes are logged
this.clusterSettings.addAffixUpdateConsumer(USER_DEFINED_META_DATA, (first, second) -> {}, (first, second) -> {});
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
this.clusterApplierService = new ClusterApplierService(nodeName, settings, clusterSettings, threadPool);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -199,6 +203,13 @@ public Settings getSettings() {
return settings;
}

/**
* The name of this node.
*/
public final String getNodeName() {
return nodeName;
}

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class MasterService extends AbstractLifecycleComponent {

public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

private final String nodeName;

private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;

private java.util.function.Supplier<ClusterState> clusterStateSupplier;
Expand All @@ -81,8 +83,9 @@ public class MasterService extends AbstractLifecycleComponent {
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
private volatile Batcher taskBatcher;

public MasterService(Settings settings, ThreadPool threadPool) {
public MasterService(String nodeName, Settings settings, ThreadPool threadPool) {
super(settings);
this.nodeName = nodeName;
// TODO: introduce a dedicated setting for master service
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.threadPool = threadPool;
Expand All @@ -105,7 +108,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
nodeName() + "/" + MASTER_UPDATE_THREAD_NAME,
nodeName + "/" + MASTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;

public abstract class AbstractComponent {

Expand All @@ -36,11 +35,4 @@ public AbstractComponent(Settings settings) {
this.deprecationLogger = new DeprecationLogger(logger);
this.settings = settings;
}

/**
* Returns the nodes name from the settings or the empty string if not set.
*/
public final String nodeName() {
return Node.NODE_NAME_SETTING.get(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -117,6 +118,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {

private final TimeValue resolveTimeout;

private final String nodeName;

private volatile boolean closed = false;

public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
Expand All @@ -131,6 +134,7 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);

resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
nodeName = Node.NODE_NAME_SETTING.get(settings);
logger.debug(
"using concurrent_connects [{}], resolve_timeout [{}]",
concurrentConnects,
Expand All @@ -141,7 +145,7 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService

final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastZenPingExecutorService = EsExecutors.newScaling(
nodeName() + "/" + "unicast_connect",
nodeName + "/" + "unicast_connect",
0,
concurrentConnects,
60,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -209,6 +210,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private final TransportLogger transportLogger;
private final BytesReference pingMessage;
private final String nodeName;

public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
Expand All @@ -223,6 +225,7 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger();
this.nodeName = Node.NODE_NAME_SETTING.get(settings);

final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
Expand Down Expand Up @@ -947,7 +950,7 @@ public void sendErrorResponse(
stream.setVersion(nodeVersion);
stream.setFeatures(features);
RemoteTransportException tx = new RemoteTransportException(
nodeName(), new TransportAddress(channel.getLocalAddress()), action, error);
nodeName, new TransportAddress(channel.getLocalAddress()), action, error);
threadPool.getThreadContext().writeTo(stream);
stream.writeException(tx);
byte status = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ static class TimedClusterApplierService extends ClusterApplierService {
public volatile Long currentTimeOverride = null;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
super("test_node", settings, clusterSettings, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ static class TimedMasterService extends MasterService {
public volatile Long currentTimeOverride = null;

TimedMasterService(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
super("test_node", settings, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
public class ClusterServiceUtils {

public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) {
MasterService masterService = new MasterService(Settings.EMPTY, threadPool);
MasterService masterService = new MasterService("test_master_node", Settings.EMPTY, threadPool);
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state()));
masterService.setClusterStateSupplier(clusterStateRef::get);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
return emptyList();
}

Auditor auditor = new Auditor(client, clusterService.nodeName());
Auditor auditor = new Auditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
}

private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService.nodeName());
Auditor auditor = new Auditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client, threadPool),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void executeProcessUpdates(Iterator<UpdateHolder> updatesIterator) {

if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
assert clusterService.localNode().isMasterNode();
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
LOGGER.error("Job update was submitted to non-master node [" + clusterService.getNodeName() + "]; update for job ["
+ update.getJobId() + "] will be ignored");
executeProcessUpdates(updatesIterator);
return;
Expand Down