Skip to content

Commit 462e91d

Browse files
authored
Logging: Use settings when building daemon threads (#32751)
Subclasses of `EsIntegTestCase` run multiple Elasticsearch nodes in the same JVM and when we log we look at the name of the thread to figure out the node name. This makes sure that all calls to `daemonThreadFactory` include the node name. Closes #32574 I'd like to follow this up with more drastic changes that make it impossible to do this incorrectly but that change is much larger than this and I'd like to get these log lines fixed up sooner rather than later.
1 parent 0749b18 commit 462e91d

File tree

6 files changed

+22
-15
lines changed

6 files changed

+22
-15
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,13 @@ public static String threadName(Settings settings, String namePrefix) {
160160
if (Node.NODE_NAME_SETTING.exists(settings)) {
161161
return threadName(Node.NODE_NAME_SETTING.get(settings), namePrefix);
162162
} else {
163+
// TODO this should only be allowed in tests
163164
return threadName("", namePrefix);
164165
}
165166
}
166167

167168
public static String threadName(final String nodeName, final String namePrefix) {
169+
// TODO missing node names should only be allowed in tests
168170
return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]";
169171
}
170172

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
233233

234234
@Override
235235
protected void doStop() {
236-
ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown"));
236+
ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown"));
237237

238238
// Copy indices because we modify it asynchronously in the body of the loop
239239
final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet());

x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public LicenseService(Settings settings, ClusterService clusterService, Clock cl
120120
super(settings);
121121
this.clusterService = clusterService;
122122
this.clock = clock;
123-
this.scheduler = new SchedulerEngine(clock);
123+
this.scheduler = new SchedulerEngine(settings, clock);
124124
this.licenseState = licenseState;
125125
this.operationModeFileWatcher = new OperationModeFileWatcher(resourceWatcherService,
126126
XPackPlugin.resolveConfigFile(env, "license_mode"), logger,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.scheduler;
77

8+
import org.elasticsearch.common.settings.Settings;
89
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
910
import org.elasticsearch.common.util.concurrent.EsExecutors;
1011
import org.elasticsearch.common.util.concurrent.FutureUtils;
@@ -92,9 +93,9 @@ public interface Schedule {
9293
private final Clock clock;
9394
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
9495

95-
public SchedulerEngine(Clock clock) {
96+
public SchedulerEngine(Settings settings, Clock clock) {
9697
this.clock = clock;
97-
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler"));
98+
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler"));
9899
}
99100

100101
public void register(Listener listener) {

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
194194
return emptyList();
195195
}
196196

197-
SchedulerEngine schedulerEngine = new SchedulerEngine(getClock());
197+
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock());
198198
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool));
199199
}
200200

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.client.Client;
1212
import org.elasticsearch.common.settings.Settings;
1313
import org.elasticsearch.common.util.concurrent.ThreadContext;
14+
import org.elasticsearch.node.Node;
1415
import org.elasticsearch.persistent.PersistentTaskState;
1516
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1617
import org.elasticsearch.search.aggregations.Aggregations;
@@ -47,6 +48,9 @@
4748

4849
public class RollupJobTaskTests extends ESTestCase {
4950

51+
private static final Settings SETTINGS = Settings.builder()
52+
.put(Node.NODE_NAME_SETTING.getKey(), "test")
53+
.build();
5054
private static ThreadPool pool = new TestThreadPool("test");
5155

5256
@AfterClass
@@ -62,7 +66,7 @@ public void testInitialStatusStopped() {
6266
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean());
6367
Client client = mock(Client.class);
6468
when(client.settings()).thenReturn(Settings.EMPTY);
65-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
69+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
6670
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
6771
status, client, schedulerEngine, pool, Collections.emptyMap());
6872
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
@@ -75,7 +79,7 @@ public void testInitialStatusAborting() {
7579
RollupJobStatus status = new RollupJobStatus(IndexerState.ABORTING, Collections.singletonMap("foo", "bar"), randomBoolean());
7680
Client client = mock(Client.class);
7781
when(client.settings()).thenReturn(Settings.EMPTY);
78-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
82+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
7983
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
8084
status, client, schedulerEngine, pool, Collections.emptyMap());
8185
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
@@ -88,7 +92,7 @@ public void testInitialStatusStopping() {
8892
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPING, Collections.singletonMap("foo", "bar"), randomBoolean());
8993
Client client = mock(Client.class);
9094
when(client.settings()).thenReturn(Settings.EMPTY);
91-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
95+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
9296
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
9397
status, client, schedulerEngine, pool, Collections.emptyMap());
9498
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
@@ -101,7 +105,7 @@ public void testInitialStatusStarted() {
101105
RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean());
102106
Client client = mock(Client.class);
103107
when(client.settings()).thenReturn(Settings.EMPTY);
104-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
108+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
105109
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
106110
status, client, schedulerEngine, pool, Collections.emptyMap());
107111
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
@@ -114,7 +118,7 @@ public void testInitialStatusIndexingOldID() {
114118
RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), false);
115119
Client client = mock(Client.class);
116120
when(client.settings()).thenReturn(Settings.EMPTY);
117-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
121+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
118122
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
119123
status, client, schedulerEngine, pool, Collections.emptyMap());
120124
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
@@ -128,7 +132,7 @@ public void testInitialStatusIndexingNewID() {
128132
RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), true);
129133
Client client = mock(Client.class);
130134
when(client.settings()).thenReturn(Settings.EMPTY);
131-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
135+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
132136
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
133137
status, client, schedulerEngine, pool, Collections.emptyMap());
134138
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
@@ -141,7 +145,7 @@ public void testNoInitialStatus() {
141145
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
142146
Client client = mock(Client.class);
143147
when(client.settings()).thenReturn(Settings.EMPTY);
144-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
148+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
145149
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
146150
null, client, schedulerEngine, pool, Collections.emptyMap());
147151
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
@@ -154,7 +158,7 @@ public void testStartWhenStarted() throws InterruptedException {
154158
RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean());
155159
Client client = mock(Client.class);
156160
when(client.settings()).thenReturn(Settings.EMPTY);
157-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
161+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
158162
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
159163
status, client, schedulerEngine, pool, Collections.emptyMap());
160164
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
@@ -641,7 +645,7 @@ public void testStopWhenStopped() throws InterruptedException {
641645
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean());
642646
Client client = mock(Client.class);
643647
when(client.settings()).thenReturn(Settings.EMPTY);
644-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
648+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
645649
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
646650
status, client, schedulerEngine, pool, Collections.emptyMap());
647651
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
@@ -748,7 +752,7 @@ public void testStopWhenAborting() throws InterruptedException {
748752
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean());
749753
Client client = mock(Client.class);
750754
when(client.settings()).thenReturn(Settings.EMPTY);
751-
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
755+
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
752756

753757
CountDownLatch latch = new CountDownLatch(2);
754758

0 commit comments

Comments
 (0)