Skip to content

Commit d706b40

Browse files
authored
Clean up Node#close. (#39317)
`Node#close` is pretty hard to rely on today: - it might swallow exceptions - it waits for 10 seconds for threads to terminate but doesn't signal anything if threads are still not terminated after 10 seconds This commit makes `IOException`s propagated and splits `Node#close` into `Node#close` and `Node#awaitClose` so that the decision what to do if a node takes too long to close can be done on top of `Node#close`. It also adds synchronization to lifecycle transitions to make them atomic. I don't think it is a source of problems today, but it makes things easier to reason about.
1 parent bdc0c9c commit d706b40

File tree

14 files changed

+278
-85
lines changed

14 files changed

+278
-85
lines changed

server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Collections;
6060
import java.util.List;
6161
import java.util.concurrent.CountDownLatch;
62+
import java.util.concurrent.TimeUnit;
6263

6364
/**
6465
* Internal startup code.
@@ -183,8 +184,15 @@ public void run() {
183184
IOUtils.close(node, spawner);
184185
LoggerContext context = (LoggerContext) LogManager.getContext(false);
185186
Configurator.shutdown(context);
187+
if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
188+
throw new IllegalStateException("Node didn't stop within 10 seconds. " +
189+
"Any outstanding requests or tasks might get killed.");
190+
}
186191
} catch (IOException ex) {
187192
throw new ElasticsearchException("failed to stop node", ex);
193+
} catch (InterruptedException e) {
194+
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
195+
Thread.currentThread().interrupt();
188196
}
189197
}
190198
});
@@ -267,6 +275,12 @@ private void start() throws NodeValidationException {
267275
static void stop() throws IOException {
268276
try {
269277
IOUtils.close(INSTANCE.node, INSTANCE.spawner);
278+
if (INSTANCE.node != null && INSTANCE.node.awaitClose(10, TimeUnit.SECONDS) == false) {
279+
throw new IllegalStateException("Node didn't stop within 10 seconds. Any outstanding requests or tasks might get killed.");
280+
}
281+
} catch (InterruptedException e) {
282+
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
283+
Thread.currentThread().interrupt();
270284
} finally {
271285
INSTANCE.keepAliveLatch.countDown();
272286
}

server/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java

Lines changed: 45 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@
1919

2020
package org.elasticsearch.common.component;
2121

22-
import org.apache.logging.log4j.LogManager;
23-
import org.apache.logging.log4j.Logger;
24-
2522
import java.io.IOException;
23+
import java.io.UncheckedIOException;
2624
import java.util.List;
2725
import java.util.concurrent.CopyOnWriteArrayList;
2826

2927
public abstract class AbstractLifecycleComponent implements LifecycleComponent {
30-
private static final Logger logger = LogManager.getLogger(AbstractLifecycleComponent.class);
3128

3229
protected final Lifecycle lifecycle = new Lifecycle();
3330

@@ -52,59 +49,64 @@ public void removeLifecycleListener(LifecycleListener listener) {
5249

5350
@Override
5451
public void start() {
55-
if (!lifecycle.canMoveToStarted()) {
56-
return;
57-
}
58-
for (LifecycleListener listener : listeners) {
59-
listener.beforeStart();
60-
}
61-
doStart();
62-
lifecycle.moveToStarted();
63-
for (LifecycleListener listener : listeners) {
64-
listener.afterStart();
52+
synchronized (lifecycle) {
53+
if (!lifecycle.canMoveToStarted()) {
54+
return;
55+
}
56+
for (LifecycleListener listener : listeners) {
57+
listener.beforeStart();
58+
}
59+
doStart();
60+
lifecycle.moveToStarted();
61+
for (LifecycleListener listener : listeners) {
62+
listener.afterStart();
63+
}
6564
}
6665
}
6766

6867
protected abstract void doStart();
6968

7069
@Override
7170
public void stop() {
72-
if (!lifecycle.canMoveToStopped()) {
73-
return;
74-
}
75-
for (LifecycleListener listener : listeners) {
76-
listener.beforeStop();
77-
}
78-
lifecycle.moveToStopped();
79-
doStop();
80-
for (LifecycleListener listener : listeners) {
81-
listener.afterStop();
71+
synchronized (lifecycle) {
72+
if (!lifecycle.canMoveToStopped()) {
73+
return;
74+
}
75+
for (LifecycleListener listener : listeners) {
76+
listener.beforeStop();
77+
}
78+
lifecycle.moveToStopped();
79+
doStop();
80+
for (LifecycleListener listener : listeners) {
81+
listener.afterStop();
82+
}
8283
}
8384
}
8485

8586
protected abstract void doStop();
8687

8788
@Override
8889
public void close() {
89-
if (lifecycle.started()) {
90-
stop();
91-
}
92-
if (!lifecycle.canMoveToClosed()) {
93-
return;
94-
}
95-
for (LifecycleListener listener : listeners) {
96-
listener.beforeClose();
97-
}
98-
lifecycle.moveToClosed();
99-
try {
100-
doClose();
101-
} catch (IOException e) {
102-
// TODO: we need to separate out closing (ie shutting down) services, vs releasing runtime transient
103-
// structures. Shutting down services should use IOUtils.close
104-
logger.warn("failed to close " + getClass().getName(), e);
105-
}
106-
for (LifecycleListener listener : listeners) {
107-
listener.afterClose();
90+
synchronized (lifecycle) {
91+
if (lifecycle.started()) {
92+
stop();
93+
}
94+
if (!lifecycle.canMoveToClosed()) {
95+
return;
96+
}
97+
for (LifecycleListener listener : listeners) {
98+
listener.beforeClose();
99+
}
100+
lifecycle.moveToClosed();
101+
try {
102+
doClose();
103+
} catch (IOException e) {
104+
throw new UncheckedIOException(e);
105+
} finally {
106+
for (LifecycleListener listener : listeners) {
107+
listener.afterClose();
108+
}
109+
}
108110
}
109111
}
110112

server/src/main/java/org/elasticsearch/common/component/Lifecycle.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,22 @@
3939
* }
4040
* </pre>
4141
* <p>
42+
* NOTE: The Lifecycle class is thread-safe. It is also possible to prevent concurrent state transitions
43+
* by locking on the Lifecycle object itself. This is typically useful when chaining multiple transitions.
44+
* <p>
4245
* Note, closed is only allowed to be called when stopped, so make sure to stop the component first.
43-
* Here is how the logic can be applied:
46+
* Here is how the logic can be applied. A lock of the {@code lifecycleState} object is taken so that
47+
* another thread cannot move the state from {@code STOPPED} to {@code STARTED} before it has moved to
48+
* {@code CLOSED}.
4449
* <pre>
4550
* public void close() {
46-
* if (lifecycleState.started()) {
47-
* stop();
48-
* }
49-
* if (!lifecycleState.moveToClosed()) {
50-
* return;
51+
* synchronized (lifecycleState) {
52+
* if (lifecycleState.started()) {
53+
* stop();
54+
* }
55+
* if (!lifecycleState.moveToClosed()) {
56+
* return;
57+
* }
5158
* }
5259
* // perform close logic here
5360
* }
@@ -116,7 +123,7 @@ public boolean canMoveToStarted() throws IllegalStateException {
116123
}
117124

118125

119-
public boolean moveToStarted() throws IllegalStateException {
126+
public synchronized boolean moveToStarted() throws IllegalStateException {
120127
State localState = this.state;
121128
if (localState == State.INITIALIZED || localState == State.STOPPED) {
122129
state = State.STARTED;
@@ -145,7 +152,7 @@ public boolean canMoveToStopped() throws IllegalStateException {
145152
throw new IllegalStateException("Can't move to stopped with unknown state");
146153
}
147154

148-
public boolean moveToStopped() throws IllegalStateException {
155+
public synchronized boolean moveToStopped() throws IllegalStateException {
149156
State localState = state;
150157
if (localState == State.STARTED) {
151158
state = State.STOPPED;
@@ -171,7 +178,7 @@ public boolean canMoveToClosed() throws IllegalStateException {
171178
return true;
172179
}
173180

174-
public boolean moveToClosed() throws IllegalStateException {
181+
public synchronized boolean moveToClosed() throws IllegalStateException {
175182
State localState = state;
176183
if (localState == State.CLOSED) {
177184
return false;

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,18 @@ public void close() {
131131
public StoredContext stashContext() {
132132
final ThreadContextStruct context = threadLocal.get();
133133
threadLocal.set(null);
134-
return () -> threadLocal.set(context);
134+
return () -> {
135+
// If the node and thus the threadLocal get closed while this task
136+
// is still executing, we don't want this runnable to fail with an
137+
// uncaught exception
138+
try {
139+
threadLocal.set(context);
140+
} catch (IllegalStateException e) {
141+
if (isClosed() == false) {
142+
throw e;
143+
}
144+
}
145+
};
135146
}
136147

137148
/**

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
2525
import org.apache.lucene.index.DirectoryReader;
26+
import org.apache.lucene.index.IndexReader.CacheHelper;
2627
import org.apache.lucene.store.AlreadyClosedException;
2728
import org.apache.lucene.store.LockObtainFailedException;
2829
import org.apache.lucene.util.CollectionUtil;
@@ -200,6 +201,7 @@ public class IndicesService extends AbstractLifecycleComponent
200201
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
201202
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
202203
final AbstractRefCounted indicesRefCount; // pkg-private for testing
204+
private final CountDownLatch closeLatch = new CountDownLatch(1);
203205

204206
@Override
205207
protected void doStart() {
@@ -273,6 +275,8 @@ protected void closeInternal() {
273275
indicesQueryCache);
274276
} catch (IOException e) {
275277
throw new UncheckedIOException(e);
278+
} finally {
279+
closeLatch.countDown();
276280
}
277281
}
278282
};
@@ -311,6 +315,18 @@ protected void doClose() throws IOException {
311315
indicesRefCount.decRef();
312316
}
313317

318+
/**
319+
* Wait for this {@link IndicesService} to be effectively closed. When this returns {@code true}, all shards and shard stores
320+
* are closed and all shard {@link CacheHelper#addClosedListener(org.apache.lucene.index.IndexReader.ClosedListener) closed
321+
* listeners} have run. However some {@link IndexEventListener#onStoreClosed(ShardId) shard closed listeners} might not have
322+
* run.
323+
* @returns true if all shards closed within the given timeout, false otherwise
324+
* @throws InterruptedException if the current thread got interrupted while waiting for shards to close
325+
*/
326+
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
327+
return closeLatch.await(timeout, timeUnit);
328+
}
329+
314330
/**
315331
* Returns the node stats indices stats. The {@code includePrevious} flag controls
316332
* if old shards stats will be aggregated as well (only for relevant stats, such as

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -783,11 +783,13 @@ private Node stop() {
783783
// In this case the process will be terminated even if the first call to close() has not finished yet.
784784
@Override
785785
public synchronized void close() throws IOException {
786-
if (lifecycle.started()) {
787-
stop();
788-
}
789-
if (!lifecycle.moveToClosed()) {
790-
return;
786+
synchronized (lifecycle) {
787+
if (lifecycle.started()) {
788+
stop();
789+
}
790+
if (!lifecycle.moveToClosed()) {
791+
return;
792+
}
791793
}
792794

793795
logger.info("closing ...");
@@ -835,21 +837,12 @@ public synchronized void close() throws IOException {
835837
toClose.add(injector.getInstance(ScriptService.class));
836838

837839
toClose.add(() -> stopWatch.stop().start("thread_pool"));
838-
// TODO this should really use ThreadPool.terminate()
839840
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
840-
toClose.add(() -> {
841-
try {
842-
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
843-
} catch (InterruptedException e) {
844-
// ignore
845-
}
846-
});
847-
848-
toClose.add(() -> stopWatch.stop().start("thread_pool_force_shutdown"));
849-
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow());
841+
// Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
842+
// See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
843+
// awaitClose if the node doesn't finish closing within the specified time.
850844
toClose.add(() -> stopWatch.stop());
851845

852-
853846
toClose.add(injector.getInstance(NodeEnvironment.class));
854847
toClose.add(injector.getInstance(PageCacheRecycler.class));
855848

@@ -860,6 +853,30 @@ public synchronized void close() throws IOException {
860853
logger.info("closed");
861854
}
862855

856+
/**
857+
* Wait for this node to be effectively closed.
858+
*/
859+
// synchronized to prevent running concurrently with close()
860+
public synchronized boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
861+
if (lifecycle.closed() == false) {
862+
// We don't want to shutdown the threadpool or interrupt threads on a node that is not
863+
// closed yet.
864+
throw new IllegalStateException("Call close() first");
865+
}
866+
867+
868+
ThreadPool threadPool = injector.getInstance(ThreadPool.class);
869+
final boolean terminated = ThreadPool.terminate(threadPool, timeout, timeUnit);
870+
if (terminated) {
871+
// All threads terminated successfully. Because search, recovery and all other operations
872+
// that run on shards run in the threadpool, indices should be effectively closed by now.
873+
if (nodeService.awaitClose(0, TimeUnit.MILLISECONDS) == false) {
874+
throw new IllegalStateException("Some shards are still open after the threadpool terminated. " +
875+
"Something is leaking index readers or store references.");
876+
}
877+
}
878+
return terminated;
879+
}
863880

864881
/**
865882
* Returns {@code true} if the node is closed.

server/src/main/java/org/elasticsearch/node/NodeService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.io.Closeable;
4545
import java.io.IOException;
46+
import java.util.concurrent.TimeUnit;
4647

4748
public class NodeService implements Closeable {
4849
private final Settings settings;
@@ -135,4 +136,12 @@ public void close() throws IOException {
135136
IOUtils.close(indicesService);
136137
}
137138

139+
/**
140+
* Wait for the node to be effectively closed.
141+
* @see IndicesService#awaitClose(long, TimeUnit)
142+
*/
143+
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
144+
return indicesService.awaitClose(timeout, timeUnit);
145+
}
146+
138147
}

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ private InetSocketAddress bindToPort(final String name, final InetAddress hostAd
360360
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
361361
closeLock.writeLock().lock();
362362
try {
363+
// No need for locking here since Lifecycle objects can't move from STARTED to INITIALIZED
363364
if (lifecycle.initialized() == false && lifecycle.started() == false) {
364365
throw new IllegalStateException("transport has been stopped");
365366
}

server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,10 @@ public void onFailure(Exception e) {
136136

137137
@Override
138138
public void close() {
139-
lifecycle.moveToStopped();
140-
lifecycle.moveToClosed();
139+
synchronized (lifecycle) {
140+
lifecycle.moveToStopped();
141+
lifecycle.moveToClosed();
142+
}
141143
}
142144

143145
private class ScheduledPing extends AbstractLifecycleRunnable {

0 commit comments

Comments
 (0)