Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ public static String threadName(Settings settings, String namePrefix) {
if (Node.NODE_NAME_SETTING.exists(settings)) {
return threadName(Node.NODE_NAME_SETTING.get(settings), namePrefix);
} else {
// TODO this should only be allowed in tests
return threadName("", namePrefix);
}
}

public static String threadName(final String nodeName, final String namePrefix) {
// TODO missing node names should only be allowed in tests
return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

@Override
protected void doStop() {
ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown"));
ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown"));

// Copy indices because we modify it asynchronously in the body of the loop
final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public LicenseService(Settings settings, ClusterService clusterService, Clock cl
super(settings);
this.clusterService = clusterService;
this.clock = clock;
this.scheduler = new SchedulerEngine(clock);
this.scheduler = new SchedulerEngine(settings, clock);
this.licenseState = licenseState;
this.operationModeFileWatcher = new OperationModeFileWatcher(resourceWatcherService,
XPackPlugin.resolveConfigFile(env, "license_mode"), logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.scheduler;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
Expand Down Expand Up @@ -92,9 +93,9 @@ public interface Schedule {
private final Clock clock;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();

public SchedulerEngine(Clock clock) {
public SchedulerEngine(Settings settings, Clock clock) {
this.clock = clock;
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler"));
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler"));
}

public void register(Listener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
return emptyList();
}

SchedulerEngine schedulerEngine = new SchedulerEngine(getClock());
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock());
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.Node;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.aggregations.Aggregations;
Expand Down Expand Up @@ -47,6 +48,9 @@

public class RollupJobTaskTests extends ESTestCase {

private static final Settings SETTINGS = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), "test")
.build();
private static ThreadPool pool = new TestThreadPool("test");

@AfterClass
Expand All @@ -62,7 +66,7 @@ public void testInitialStatusStopped() {
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
Expand All @@ -75,7 +79,7 @@ public void testInitialStatusAborting() {
RollupJobStatus status = new RollupJobStatus(IndexerState.ABORTING, Collections.singletonMap("foo", "bar"), randomBoolean());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
Expand All @@ -88,7 +92,7 @@ public void testInitialStatusStopping() {
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPING, Collections.singletonMap("foo", "bar"), randomBoolean());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
Expand All @@ -101,7 +105,7 @@ public void testInitialStatusStarted() {
RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
Expand All @@ -114,7 +118,7 @@ public void testInitialStatusIndexingOldID() {
RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), false);
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
Expand All @@ -128,7 +132,7 @@ public void testInitialStatusIndexingNewID() {
RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), true);
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
Expand All @@ -141,7 +145,7 @@ public void testNoInitialStatus() {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
Expand All @@ -154,7 +158,7 @@ public void testStartWhenStarted() throws InterruptedException {
RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
Expand Down Expand Up @@ -641,7 +645,7 @@ public void testStopWhenStopped() throws InterruptedException {
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
Expand Down Expand Up @@ -748,7 +752,7 @@ public void testStopWhenAborting() throws InterruptedException {
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());

CountDownLatch latch = new CountDownLatch(2);

Expand Down