diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index c5989f60d1e5c..1a1d9beb85613 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -22,8 +22,10 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -59,6 +61,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -113,8 +116,7 @@ public void start(Settings settings, TransportService transportService, ClusterS } } - final PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter(); - final PersistedState persistedState; + PersistedState persistedState = null; boolean success = false; try { final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService, @@ -123,10 +125,10 @@ public void start(Settings settings, TransportService transportService, ClusterS .metaData(upgradeMetaDataForNode(metaData, metaDataIndexUpgradeService, metaDataUpgrader)) .build()); if (DiscoveryNode.isMasterNode(settings)) { - persistedState = new LucenePersistedState(persistenceWriter, currentTerm, clusterState); + persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); } else { persistedState = new AsyncLucenePersistedState(settings, transportService.getThreadPool(), - new LucenePersistedState(persistenceWriter, currentTerm, clusterState)); + new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState)); } if (DiscoveryNode.isDataNode(settings)) { metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality) @@ -139,7 +141,7 @@ public void start(Settings settings, TransportService transportService, ClusterS success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(persistenceWriter); + IOUtils.closeWhileHandlingException(persistedState); } } @@ -392,11 +394,15 @@ static class LucenePersistedState implements PersistedState { private long currentTerm; private ClusterState lastAcceptedState; - private final PersistedClusterStateService.Writer persistenceWriter; + private final PersistedClusterStateService persistedClusterStateService; - LucenePersistedState(PersistedClusterStateService.Writer persistenceWriter, long currentTerm, ClusterState lastAcceptedState) + // As the close method can be concurrently called to the other PersistedState methods, this class has extra protection in place. + private final AtomicReference persistenceWriter = new AtomicReference<>(); + boolean writeNextStateFully; + + LucenePersistedState(PersistedClusterStateService persistedClusterStateService, long currentTerm, ClusterState lastAcceptedState) throws IOException { - this.persistenceWriter = persistenceWriter; + this.persistedClusterStateService = persistedClusterStateService; this.currentTerm = currentTerm; this.lastAcceptedState = lastAcceptedState; // Write the whole state out to be sure it's fresh and using the latest format. Called during initialisation, so that @@ -406,7 +412,18 @@ static class LucenePersistedState implements PersistedState { // In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance, // this is true if there's only one data path on this master node, and the commit we just loaded was already written out // by this version of Elasticsearch. TODO TBD should we avoid indexing when possible? - persistenceWriter.writeFullStateAndCommit(currentTerm, lastAcceptedState); + final PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter(); + try { + writer.writeFullStateAndCommit(currentTerm, lastAcceptedState); + } catch (Exception e) { + try { + writer.close(); + } catch (Exception e2) { + e.addSuppressed(e2); + } + throw e; + } + persistenceWriter.set(writer); } @Override @@ -421,32 +438,74 @@ public ClusterState getLastAcceptedState() { @Override public void setCurrentTerm(long currentTerm) { - persistenceWriter.commit(currentTerm, lastAcceptedState.version()); + try { + if (writeNextStateFully) { + getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState); + writeNextStateFully = false; + } else { + getWriterSafe().commit(currentTerm, lastAcceptedState.version()); + } + } catch (Exception e) { + handleExceptionOnWrite(e); + } this.currentTerm = currentTerm; } @Override public void setLastAcceptedState(ClusterState clusterState) { try { - if (clusterState.term() != lastAcceptedState.term()) { - assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); - // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so - // it's simplest to write everything again. - persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState); + if (writeNextStateFully) { + getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState); + writeNextStateFully = false; } else { - // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. - persistenceWriter.writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState); + if (clusterState.term() != lastAcceptedState.term()) { + assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); + // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, + // so it's simplest to write everything again. + getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState); + } else { + // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. + getWriterSafe().writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState); + } } - } catch (IOException e) { - throw new UncheckedIOException(e); + } catch (Exception e) { + handleExceptionOnWrite(e); } lastAcceptedState = clusterState; } + private PersistedClusterStateService.Writer getWriterSafe() { + final PersistedClusterStateService.Writer writer = persistenceWriter.get(); + if (writer == null) { + throw new AlreadyClosedException("persisted state has been closed"); + } + if (writer.isOpen()) { + return writer; + } else { + try { + final PersistedClusterStateService.Writer newWriter = persistedClusterStateService.createWriter(); + if (persistenceWriter.compareAndSet(writer, newWriter)) { + return newWriter; + } else { + assert persistenceWriter.get() == null : "expected no concurrent calls to getWriterSafe"; + newWriter.close(); + throw new AlreadyClosedException("persisted state has been closed"); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } + } + + private void handleExceptionOnWrite(Exception e) { + writeNextStateFully = true; + throw ExceptionsHelper.convertToRuntime(e); + } + @Override public void close() throws IOException { - persistenceWriter.close(); + IOUtils.close(persistenceWriter.getAndSet(null)); } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index b0f44632d0657..c426052ce04a5 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -41,6 +41,7 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.Bits; @@ -80,6 +81,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntPredicate; /** @@ -497,13 +499,17 @@ void flush() throws IOException { this.indexWriter.flush(); } - void commit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { + void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { final Map commitData = new HashMap<>(COMMIT_DATA_SIZE); commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm)); commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion)); commitData.put(NODE_VERSION_KEY, Integer.toString(Version.CURRENT.id)); commitData.put(NODE_ID_KEY, nodeId); indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.prepareCommit(); + } + + void commit() throws IOException { indexWriter.commit(); } @@ -520,6 +526,7 @@ public static class Writer implements Closeable { private final BigArrays bigArrays; boolean fullStateWritten = false; + private final AtomicBoolean closed = new AtomicBoolean(); private Writer(List metaDataIndexWriters, String nodeId, BigArrays bigArrays) { this.metaDataIndexWriters = metaDataIndexWriters; @@ -527,13 +534,39 @@ private Writer(List metaDataIndexWriters, String nodeId, Bi this.bigArrays = bigArrays; } + private void ensureOpen() { + if (closed.get()) { + throw new AlreadyClosedException("cluster state writer is closed already"); + } + } + + public boolean isOpen() { + return closed.get() == false; + } + + private void closeIfAnyIndexWriterHasTragedyOrIsClosed() { + if (metaDataIndexWriters.stream().map(writer -> writer.indexWriter) + .anyMatch(iw -> iw.getTragicException() != null || iw.isOpen() == false)) { + try { + close(); + } catch (Exception e) { + logger.warn("failed on closing cluster state writer", e); + } + } + } + /** * Overrides and commits the given current term and cluster state */ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException { - overwriteMetaData(clusterState.metaData()); - commit(currentTerm, clusterState.version()); - fullStateWritten = true; + ensureOpen(); + try { + overwriteMetaData(clusterState.metaData()); + commit(currentTerm, clusterState.version()); + fullStateWritten = true; + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } } /** @@ -541,9 +574,14 @@ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) */ void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, ClusterState clusterState) throws IOException { + ensureOpen(); assert fullStateWritten : "Need to write full state first before doing incremental writes"; - updateMetaData(previousClusterState.metaData(), clusterState.metaData()); - commit(currentTerm, clusterState.version()); + try { + updateMetaData(previousClusterState.metaData(), clusterState.metaData()); + commit(currentTerm, clusterState.version()); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } } /** @@ -634,23 +672,48 @@ private void addMetaData(MetaData metaData) throws IOException { } } - public void commit(long currentTerm, long lastAcceptedVersion) { + public void commit(long currentTerm, long lastAcceptedVersion) throws IOException { + ensureOpen(); try { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { - metaDataIndexWriter.commit(nodeId, currentTerm, lastAcceptedVersion); + metaDataIndexWriter.prepareCommit(nodeId, currentTerm, lastAcceptedVersion); + } + } catch (Exception e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed on closing cluster state writer", e2); + e.addSuppressed(e2); + } + throw e; + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + try { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.commit(); } } catch (IOException e) { // The commit() call has similar semantics to a fsync(): although it's atomic, if it fails then we've no idea whether the // data on disk is now the old version or the new version, and this is a disaster. It's safest to fail the whole node and // retry from the beginning. + try { + close(); + } catch (Exception e2) { + e.addSuppressed(e2); + } throw new IOError(e); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); } } @Override public void close() throws IOException { - logger.trace("closing"); - IOUtils.close(metaDataIndexWriters); + logger.trace("closing PersistedClusterStateService.Writer"); + if (closed.compareAndSet(false, true)) { + IOUtils.close(metaDataIndexWriters); + } } private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index b4ac9a83db65f..64041e85d897a 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -865,8 +865,6 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(SearchService.class)); toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); - toClose.add(() -> stopWatch.stop().start("gateway_meta_state")); - toClose.add(injector.getInstance(GatewayMetaState.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); @@ -882,8 +880,11 @@ public synchronized void close() throws IOException { // Don't call shutdownNow here, it might break ongoing operations on Lucene indices. // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in // awaitClose if the node doesn't finish closing within the specified time. - toClose.add(() -> stopWatch.stop().start("node_environment")); + toClose.add(() -> stopWatch.stop().start("gateway_meta_state")); + toClose.add(injector.getInstance(GatewayMetaState.class)); + + toClose.add(() -> stopWatch.stop().start("node_environment")); toClose.add(injector.getInstance(NodeEnvironment.class)); toClose.add(stopWatch::stop); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index af7443055e06f..04b58ee04e219 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.gateway; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -46,10 +49,14 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOError; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -253,7 +260,7 @@ public void testMarkAcceptedConfigAsCommitted() throws IOException { try { gateway = newGatewayPersistedState(); - //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration + // generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration CoordinationMetaData coordinationMetaData; do { coordinationMetaData = createCoordinationMetaData(randomNonNegativeLong()); @@ -293,7 +300,7 @@ public void testStatePersistedOnLoad() throws IOException { final ClusterState state = createClusterState(randomNonNegativeLong(), MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState( - persistedClusterStateService.createWriter(), 42L, state)) { + persistedClusterStateService, 42L, state)) { } @@ -409,4 +416,96 @@ public void testDataOnlyNodePersistence() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } + public void testStatePersistenceWithIOIssues() throws IOException { + final AtomicReference ioExceptionRate = new AtomicReference<>(0.01d); + final List list = new ArrayList<>(); + final PersistedClusterStateService persistedClusterStateService = + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + @Override + Directory createDirectory(Path path) { + final MockDirectoryWrapper wrapper = newMockFSDirectory(path); + wrapper.setAllowRandomFileNotFoundException(randomBoolean()); + wrapper.setRandomIOExceptionRate(ioExceptionRate.get()); + wrapper.setRandomIOExceptionRateOnOpen(ioExceptionRate.get()); + list.add(wrapper); + return wrapper; + } + }; + ClusterState state = createClusterState(randomNonNegativeLong(), + MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); + long currentTerm = 42L; + try (GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState( + persistedClusterStateService, currentTerm, state)) { + + try { + if (randomBoolean()) { + final ClusterState newState = createClusterState(randomNonNegativeLong(), + MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); + persistedState.setLastAcceptedState(newState); + state = newState; + } else { + final long newTerm = currentTerm + 1; + persistedState.setCurrentTerm(newTerm); + currentTerm = newTerm; + } + } catch (IOError | Exception e) { + assertNotNull(ExceptionsHelper.unwrap(e, IOException.class)); + } + + ioExceptionRate.set(0.0d); + for (MockDirectoryWrapper wrapper : list) { + wrapper.setRandomIOExceptionRate(ioExceptionRate.get()); + wrapper.setRandomIOExceptionRateOnOpen(ioExceptionRate.get()); + } + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + if (randomBoolean()) { + final long version = randomNonNegativeLong(); + final String indexName = randomAlphaOfLength(10); + final IndexMetaData indexMetaData = createIndexMetaData(indexName, randomIntBetween(1, 5), randomNonNegativeLong()); + final MetaData metaData = MetaData.builder(). + persistentSettings(Settings.builder().put(randomAlphaOfLength(10), randomAlphaOfLength(10)).build()). + coordinationMetaData(createCoordinationMetaData(1L)). + put(indexMetaData, false). + build(); + state = createClusterState(version, metaData); + persistedState.setLastAcceptedState(state); + } else { + currentTerm += 1; + persistedState.setCurrentTerm(currentTerm); + } + } + + assertEquals(state, persistedState.getLastAcceptedState()); + assertEquals(currentTerm, persistedState.getCurrentTerm()); + + } catch (IOError | Exception e) { + if (ioExceptionRate.get() == 0.0d) { + throw e; + } + assertNotNull(ExceptionsHelper.unwrap(e, IOException.class)); + return; + } + + nodeEnvironment.close(); + + // verify that the freshest state was rewritten to each data path + for (Path path : nodeEnvironment.nodeDataPaths()) { + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_DATA_SETTING.getKey(), path.toString()).build(); + try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { + final PersistedClusterStateService newPersistedClusterStateService = + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); + assertFalse(onDiskState.empty()); + assertThat(onDiskState.currentTerm, equalTo(currentTerm)); + assertClusterStateEqual(state, + ClusterState.builder(ClusterName.DEFAULT) + .version(onDiskState.lastAcceptedVersion) + .metaData(onDiskState.metaData).build()); + } + } + } + } diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index cf16f21f288db..a05dc9fc560a8 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -374,7 +374,7 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti } } - public void testThrowsIOErrorOnExceptionDuringCommit() throws IOException { + public void testClosesWriterOnFatalError() throws IOException { final AtomicBoolean throwException = new AtomicBoolean(); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { @@ -384,8 +384,53 @@ public void testThrowsIOErrorOnExceptionDuringCommit() throws IOException { Directory createDirectory(Path path) throws IOException { return new FilterDirectory(super.createDirectory(path)) { @Override - public void sync(Collection names) throws IOException { - if (throwException.get() && names.stream().anyMatch(n -> n.startsWith("pending_segments_"))) { + public void sync(Collection names) { + throw new OutOfMemoryError("simulated"); + } + }; + } + }; + + try (Writer writer = persistedClusterStateService.createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + final long newTerm = randomNonNegativeLong(); + final ClusterState newState = ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(UUIDs.randomBase64UUID(random())) + .clusterUUIDCommitted(true) + .version(randomLongBetween(1L, Long.MAX_VALUE))) + .incrementVersion().build(); + throwException.set(true); + assertThat(expectThrows(OutOfMemoryError.class, () -> { + if (randomBoolean()) { + writeState(writer, newTerm, newState, clusterState); + } else { + writer.commit(newTerm, newState.version()); + } + }).getMessage(), + containsString("simulated")); + assertFalse(writer.isOpen()); + } + + // check if we can open writer again + try (Writer ignored = persistedClusterStateService.createWriter()) { + + } + } + } + + public void testCrashesWithIOErrorOnCommitFailure() throws IOException { + final AtomicBoolean throwException = new AtomicBoolean(); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final PersistedClusterStateService persistedClusterStateService + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + @Override + Directory createDirectory(Path path) throws IOException { + return new FilterDirectory(super.createDirectory(path)) { + @Override + public void rename(String source, String dest) throws IOException { + if (throwException.get() && dest.startsWith("segments")) { throw new IOException("simulated"); } } @@ -403,8 +448,20 @@ public void sync(Collection names) throws IOException { .version(randomLongBetween(1L, Long.MAX_VALUE))) .incrementVersion().build(); throwException.set(true); - assertThat(expectThrows(IOError.class, () -> writeState(writer, newTerm, newState, clusterState)).getMessage(), + assertThat(expectThrows(IOError.class, () -> { + if (randomBoolean()) { + writeState(writer, newTerm, newState, clusterState); + } else { + writer.commit(newTerm, newState.version()); + } + }).getMessage(), containsString("simulated")); + assertFalse(writer.isOpen()); + } + + // check if we can open writer again + try (Writer ignored = persistedClusterStateService.createWriter()) { + } } }