From 75ec3eebdc492d3e43e86fd4c194d3287d64c017 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 9 Jan 2020 11:45:59 +0100 Subject: [PATCH 1/6] Make cluster state writer resilient to disk issues --- .../gateway/GatewayMetaState.java | 96 +++++++++++++---- .../gateway/PersistedClusterStateService.java | 60 ++++++++--- .../java/org/elasticsearch/node/Node.java | 7 +- .../GatewayMetaStatePersistedStateTests.java | 102 +++++++++++++++++- .../PersistedClusterStateServiceTests.java | 23 ++-- 5 files changed, 243 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index c5989f60d1e5c..0ece37f29b54a 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -22,8 +22,10 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -59,6 +61,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -113,8 +116,7 @@ public void start(Settings settings, TransportService transportService, ClusterS } } - final PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter(); - final PersistedState persistedState; + PersistedState persistedState = null; boolean success = false; try { final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService, @@ -123,10 +125,10 @@ public void start(Settings settings, TransportService transportService, ClusterS .metaData(upgradeMetaDataForNode(metaData, metaDataIndexUpgradeService, metaDataUpgrader)) .build()); if (DiscoveryNode.isMasterNode(settings)) { - persistedState = new LucenePersistedState(persistenceWriter, currentTerm, clusterState); + persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); } else { persistedState = new AsyncLucenePersistedState(settings, transportService.getThreadPool(), - new LucenePersistedState(persistenceWriter, currentTerm, clusterState)); + new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState)); } if (DiscoveryNode.isDataNode(settings)) { metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality) @@ -139,7 +141,7 @@ public void start(Settings settings, TransportService transportService, ClusterS success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(persistenceWriter); + IOUtils.closeWhileHandlingException(persistedState); } } @@ -392,11 +394,15 @@ static class LucenePersistedState implements PersistedState { private long currentTerm; private ClusterState lastAcceptedState; - private final PersistedClusterStateService.Writer persistenceWriter; + private final PersistedClusterStateService persistedClusterStateService; - LucenePersistedState(PersistedClusterStateService.Writer persistenceWriter, long currentTerm, ClusterState lastAcceptedState) + // As the close method can be concurrently called to the other PersistedState methods, this class has extra protection in place. + private final AtomicReference persistenceWriter = new AtomicReference<>(); + boolean writeNextStateFully; + + LucenePersistedState(PersistedClusterStateService persistedClusterStateService, long currentTerm, ClusterState lastAcceptedState) throws IOException { - this.persistenceWriter = persistenceWriter; + this.persistedClusterStateService = persistedClusterStateService; this.currentTerm = currentTerm; this.lastAcceptedState = lastAcceptedState; // Write the whole state out to be sure it's fresh and using the latest format. Called during initialisation, so that @@ -406,7 +412,14 @@ static class LucenePersistedState implements PersistedState { // In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance, // this is true if there's only one data path on this master node, and the commit we just loaded was already written out // by this version of Elasticsearch. TODO TBD should we avoid indexing when possible? - persistenceWriter.writeFullStateAndCommit(currentTerm, lastAcceptedState); + final PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter(); + try { + writer.writeFullStateAndCommit(currentTerm, lastAcceptedState); + } catch (Exception e) { + writer.close(); + throw e; + } + persistenceWriter.set(writer); } @Override @@ -419,34 +432,77 @@ public ClusterState getLastAcceptedState() { return lastAcceptedState; } + private PersistedClusterStateService.Writer getWriterSafe() { + PersistedClusterStateService.Writer writer = persistenceWriter.get(); + if (writer == null) { + throw new AlreadyClosedException("persisted state has been closed"); + } + return writer; + } + @Override public void setCurrentTerm(long currentTerm) { - persistenceWriter.commit(currentTerm, lastAcceptedState.version()); + reloadWriterIfNecessary(); + try { + if (writeNextStateFully) { + getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState); + writeNextStateFully = false; + } else { + getWriterSafe().commit(currentTerm, lastAcceptedState.version()); + } + } catch (Exception e) { + handleExceptionOnWrite(e); + } this.currentTerm = currentTerm; } @Override public void setLastAcceptedState(ClusterState clusterState) { + reloadWriterIfNecessary(); try { - if (clusterState.term() != lastAcceptedState.term()) { - assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); - // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so - // it's simplest to write everything again. - persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState); + if (writeNextStateFully) { + getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState); + writeNextStateFully = false; } else { - // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. - persistenceWriter.writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState); + if (clusterState.term() != lastAcceptedState.term()) { + assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); + // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so + // it's simplest to write everything again. + getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState); + } else { + // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. + getWriterSafe().writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState); + } } - } catch (IOException e) { - throw new UncheckedIOException(e); + } catch (Exception e) { + handleExceptionOnWrite(e); } lastAcceptedState = clusterState; } + private void reloadWriterIfNecessary() { + final PersistedClusterStateService.Writer writer = getWriterSafe(); + if (writer.isOpen() == false) { + try { + final PersistedClusterStateService.Writer newWriter = persistedClusterStateService.createWriter(); + if (persistenceWriter.compareAndSet(writer, newWriter) == false) { + newWriter.close(); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } + } + + private void handleExceptionOnWrite(Exception e) { + writeNextStateFully = true; + throw ExceptionsHelper.convertToRuntime(e); + } + @Override public void close() throws IOException { - persistenceWriter.close(); + IOUtils.close(persistenceWriter.getAndSet(null)); } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index b0f44632d0657..0cd46f939f234 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -41,6 +41,7 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.Bits; @@ -80,6 +81,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntPredicate; /** @@ -520,6 +522,7 @@ public static class Writer implements Closeable { private final BigArrays bigArrays; boolean fullStateWritten = false; + private final AtomicBoolean closed = new AtomicBoolean(); private Writer(List metaDataIndexWriters, String nodeId, BigArrays bigArrays) { this.metaDataIndexWriters = metaDataIndexWriters; @@ -527,13 +530,39 @@ private Writer(List metaDataIndexWriters, String nodeId, Bi this.bigArrays = bigArrays; } + private void ensureOpen() { + if (closed.get()) { + throw new AlreadyClosedException("cluster state writer is closed already"); + } + } + + public boolean isOpen() { + return closed.get() == false; + } + + private void closeIfAnyIndexWriterHasTragedyOrIsClosed() { + if (metaDataIndexWriters.stream().map(writer -> writer.indexWriter) + .anyMatch(iw -> iw.getTragicException() != null || iw.isOpen() == false)) { + try { + close(); + } catch (Exception e) { + logger.warn("failed on closing cluster state writer", e); + } + } + } + /** * Overrides and commits the given current term and cluster state */ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException { - overwriteMetaData(clusterState.metaData()); - commit(currentTerm, clusterState.version()); - fullStateWritten = true; + ensureOpen(); + try { + overwriteMetaData(clusterState.metaData()); + commit(currentTerm, clusterState.version()); + fullStateWritten = true; + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } } /** @@ -541,9 +570,14 @@ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) */ void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, ClusterState clusterState) throws IOException { + ensureOpen(); assert fullStateWritten : "Need to write full state first before doing incremental writes"; - updateMetaData(previousClusterState.metaData(), clusterState.metaData()); - commit(currentTerm, clusterState.version()); + try { + updateMetaData(previousClusterState.metaData(), clusterState.metaData()); + commit(currentTerm, clusterState.version()); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } } /** @@ -634,23 +668,23 @@ private void addMetaData(MetaData metaData) throws IOException { } } - public void commit(long currentTerm, long lastAcceptedVersion) { + public void commit(long currentTerm, long lastAcceptedVersion) throws IOException { + ensureOpen(); try { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.commit(nodeId, currentTerm, lastAcceptedVersion); } - } catch (IOException e) { - // The commit() call has similar semantics to a fsync(): although it's atomic, if it fails then we've no idea whether the - // data on disk is now the old version or the new version, and this is a disaster. It's safest to fail the whole node and - // retry from the beginning. - throw new IOError(e); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); } } @Override public void close() throws IOException { - logger.trace("closing"); - IOUtils.close(metaDataIndexWriters); + logger.trace("closing PersistedClusterStateService.Writer"); + if (closed.compareAndSet(false, true)) { + IOUtils.close(metaDataIndexWriters); + } } private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index b4ac9a83db65f..64041e85d897a 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -865,8 +865,6 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(SearchService.class)); toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); - toClose.add(() -> stopWatch.stop().start("gateway_meta_state")); - toClose.add(injector.getInstance(GatewayMetaState.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); @@ -882,8 +880,11 @@ public synchronized void close() throws IOException { // Don't call shutdownNow here, it might break ongoing operations on Lucene indices. // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in // awaitClose if the node doesn't finish closing within the specified time. - toClose.add(() -> stopWatch.stop().start("node_environment")); + toClose.add(() -> stopWatch.stop().start("gateway_meta_state")); + toClose.add(injector.getInstance(GatewayMetaState.class)); + + toClose.add(() -> stopWatch.stop().start("node_environment")); toClose.add(injector.getInstance(NodeEnvironment.class)); toClose.add(stopWatch::stop); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index af7443055e06f..0767d589a09c0 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.gateway; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -48,8 +51,11 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -253,7 +259,7 @@ public void testMarkAcceptedConfigAsCommitted() throws IOException { try { gateway = newGatewayPersistedState(); - //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration + // generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration CoordinationMetaData coordinationMetaData; do { coordinationMetaData = createCoordinationMetaData(randomNonNegativeLong()); @@ -293,7 +299,7 @@ public void testStatePersistedOnLoad() throws IOException { final ClusterState state = createClusterState(randomNonNegativeLong(), MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState( - persistedClusterStateService.createWriter(), 42L, state)) { + persistedClusterStateService, 42L, state)) { } @@ -409,4 +415,96 @@ public void testDataOnlyNodePersistence() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } + public void testStatePersistenceWithIOIssues() throws IOException { + final AtomicReference ioExceptionRate = new AtomicReference<>(0.01d); + final List list = new ArrayList<>(); + final PersistedClusterStateService persistedClusterStateService = + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + @Override + Directory createDirectory(Path path) { + final MockDirectoryWrapper wrapper = newMockFSDirectory(path); + wrapper.setAllowRandomFileNotFoundException(randomBoolean()); + wrapper.setRandomIOExceptionRate(ioExceptionRate.get()); + wrapper.setRandomIOExceptionRateOnOpen(ioExceptionRate.get()); + list.add(wrapper); + return wrapper; + } + }; + ClusterState state = createClusterState(randomNonNegativeLong(), + MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); + long currentTerm = 42L; + try (GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState( + persistedClusterStateService, currentTerm, state)) { + + try { + if (randomBoolean()) { + final ClusterState newState = createClusterState(randomNonNegativeLong(), + MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); + persistedState.setLastAcceptedState(newState); + state = newState; + } else { + final long newTerm = currentTerm + 1; + persistedState.setCurrentTerm(newTerm); + currentTerm = newTerm; + } + } catch (Exception e) { + assertNotNull(ExceptionsHelper.unwrap(e, IOException.class)); + } + + ioExceptionRate.set(0.0d); + for (MockDirectoryWrapper wrapper : list) { + wrapper.setRandomIOExceptionRate(ioExceptionRate.get()); + wrapper.setRandomIOExceptionRateOnOpen(ioExceptionRate.get()); + } + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + if (randomBoolean()) { + final long version = randomNonNegativeLong(); + final String indexName = randomAlphaOfLength(10); + final IndexMetaData indexMetaData = createIndexMetaData(indexName, randomIntBetween(1, 5), randomNonNegativeLong()); + final MetaData metaData = MetaData.builder(). + persistentSettings(Settings.builder().put(randomAlphaOfLength(10), randomAlphaOfLength(10)).build()). + coordinationMetaData(createCoordinationMetaData(1L)). + put(indexMetaData, false). + build(); + state = createClusterState(version, metaData); + persistedState.setLastAcceptedState(state); + } else { + currentTerm += 1; + persistedState.setCurrentTerm(currentTerm); + } + } + + assertEquals(state, persistedState.getLastAcceptedState()); + assertEquals(currentTerm, persistedState.getCurrentTerm()); + + } catch (Exception e) { + if (ioExceptionRate.get() == 0.0d) { + throw e; + } + assertNotNull(ExceptionsHelper.unwrap(e, IOException.class)); + return; + } + + nodeEnvironment.close(); + + // verify that the freshest state was rewritten to each data path + for (Path path : nodeEnvironment.nodeDataPaths()) { + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_DATA_SETTING.getKey(), path.toString()).build(); + try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { + final PersistedClusterStateService newPersistedClusterStateService = + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); + assertFalse(onDiskState.empty()); + assertThat(onDiskState.currentTerm, equalTo(currentTerm)); + assertClusterStateEqual(state, + ClusterState.builder(ClusterName.DEFAULT) + .version(onDiskState.lastAcceptedVersion) + .metaData(onDiskState.metaData).build()); + } + } + } + } diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index cf16f21f288db..582d7c69a8468 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -46,7 +46,6 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; -import java.io.IOError; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; @@ -374,7 +373,7 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti } } - public void testThrowsIOErrorOnExceptionDuringCommit() throws IOException { + public void testClosesWriterOnFatalError() throws IOException { final AtomicBoolean throwException = new AtomicBoolean(); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { @@ -384,10 +383,8 @@ public void testThrowsIOErrorOnExceptionDuringCommit() throws IOException { Directory createDirectory(Path path) throws IOException { return new FilterDirectory(super.createDirectory(path)) { @Override - public void sync(Collection names) throws IOException { - if (throwException.get() && names.stream().anyMatch(n -> n.startsWith("pending_segments_"))) { - throw new IOException("simulated"); - } + public void sync(Collection names) { + throw new OutOfMemoryError("simulated"); } }; } @@ -403,8 +400,20 @@ public void sync(Collection names) throws IOException { .version(randomLongBetween(1L, Long.MAX_VALUE))) .incrementVersion().build(); throwException.set(true); - assertThat(expectThrows(IOError.class, () -> writeState(writer, newTerm, newState, clusterState)).getMessage(), + assertThat(expectThrows(OutOfMemoryError.class, () -> { + if (randomBoolean()) { + writeState(writer, newTerm, newState, clusterState); + } else { + writer.commit(newTerm, newState.version()); + } + }).getMessage(), containsString("simulated")); + assertFalse(writer.isOpen()); + } + + // check if we can open writer again + try (Writer ignored = persistedClusterStateService.createWriter()) { + } } } From 1c7eb4c7dc61c3bfc41e111b4d9c5313c43a3e5c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 9 Jan 2020 18:00:26 +0100 Subject: [PATCH 2/6] checkstyle --- .../main/java/org/elasticsearch/gateway/GatewayMetaState.java | 4 ++-- .../elasticsearch/gateway/PersistedClusterStateService.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 0ece37f29b54a..13d8b8285e551 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -466,8 +466,8 @@ public void setLastAcceptedState(ClusterState clusterState) { } else { if (clusterState.term() != lastAcceptedState.term()) { assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); - // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so - // it's simplest to write everything again. + // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, + // so it's simplest to write everything again. getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState); } else { // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 0cd46f939f234..ca20140895c58 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -71,7 +71,6 @@ import java.io.Closeable; import java.io.FilterOutputStream; -import java.io.IOError; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; From 333f42a417882d3d34a3daf25788f5110ead26f1 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 10 Jan 2020 14:29:37 +0100 Subject: [PATCH 3/6] inline reloadWriterIfNecessary --- .../gateway/GatewayMetaState.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 13d8b8285e551..1a1d9beb85613 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -416,7 +416,11 @@ static class LucenePersistedState implements PersistedState { try { writer.writeFullStateAndCommit(currentTerm, lastAcceptedState); } catch (Exception e) { - writer.close(); + try { + writer.close(); + } catch (Exception e2) { + e.addSuppressed(e2); + } throw e; } persistenceWriter.set(writer); @@ -432,17 +436,8 @@ public ClusterState getLastAcceptedState() { return lastAcceptedState; } - private PersistedClusterStateService.Writer getWriterSafe() { - PersistedClusterStateService.Writer writer = persistenceWriter.get(); - if (writer == null) { - throw new AlreadyClosedException("persisted state has been closed"); - } - return writer; - } - @Override public void setCurrentTerm(long currentTerm) { - reloadWriterIfNecessary(); try { if (writeNextStateFully) { getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState); @@ -458,7 +453,6 @@ public void setCurrentTerm(long currentTerm) { @Override public void setLastAcceptedState(ClusterState clusterState) { - reloadWriterIfNecessary(); try { if (writeNextStateFully) { getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState); @@ -481,13 +475,22 @@ public void setLastAcceptedState(ClusterState clusterState) { lastAcceptedState = clusterState; } - private void reloadWriterIfNecessary() { - final PersistedClusterStateService.Writer writer = getWriterSafe(); - if (writer.isOpen() == false) { + private PersistedClusterStateService.Writer getWriterSafe() { + final PersistedClusterStateService.Writer writer = persistenceWriter.get(); + if (writer == null) { + throw new AlreadyClosedException("persisted state has been closed"); + } + if (writer.isOpen()) { + return writer; + } else { try { final PersistedClusterStateService.Writer newWriter = persistedClusterStateService.createWriter(); - if (persistenceWriter.compareAndSet(writer, newWriter) == false) { + if (persistenceWriter.compareAndSet(writer, newWriter)) { + return newWriter; + } else { + assert persistenceWriter.get() == null : "expected no concurrent calls to getWriterSafe"; newWriter.close(); + throw new AlreadyClosedException("persisted state has been closed"); } } catch (Exception e) { throw ExceptionsHelper.convertToRuntime(e); From 7082947b86ac9d95cf66b98c83025c03a84e892e Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 10 Jan 2020 15:39:15 +0100 Subject: [PATCH 4/6] David likes crashes --- .../gateway/PersistedClusterStateService.java | 33 ++++++++++++- .../GatewayMetaStatePersistedStateTests.java | 5 +- .../PersistedClusterStateServiceTests.java | 48 +++++++++++++++++++ 3 files changed, 82 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index ca20140895c58..e97ccf68dc03e 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -71,6 +71,7 @@ import java.io.Closeable; import java.io.FilterOutputStream; +import java.io.IOError; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -498,13 +499,17 @@ void flush() throws IOException { this.indexWriter.flush(); } - void commit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { + void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { final Map commitData = new HashMap<>(COMMIT_DATA_SIZE); commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm)); commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion)); commitData.put(NODE_VERSION_KEY, Integer.toString(Version.CURRENT.id)); commitData.put(NODE_ID_KEY, nodeId); indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.prepareCommit(); + } + + void commit() throws IOException { indexWriter.commit(); } @@ -671,7 +676,31 @@ public void commit(long currentTerm, long lastAcceptedVersion) throws IOExceptio ensureOpen(); try { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { - metaDataIndexWriter.commit(nodeId, currentTerm, lastAcceptedVersion); + metaDataIndexWriter.prepareCommit(nodeId, currentTerm, lastAcceptedVersion); + } + } catch (Exception e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed on closing cluster state writer", e2); + e.addSuppressed(e2); + } + throw e; + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + try { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.commit(); + } + } catch (IOException e) { + // The commit() call has similar semantics to a fsync(): although it's atomic, if it fails then we've no idea whether the + // data on disk is now the old version or the new version, and this is a disaster. It's safest to fail the whole node and + // retry from the beginning. + try { + close(); + } finally { + throw new IOError(e); } } finally { closeIfAnyIndexWriterHasTragedyOrIsClosed(); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 0767d589a09c0..04b58ee04e219 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOError; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; @@ -447,7 +448,7 @@ Directory createDirectory(Path path) { persistedState.setCurrentTerm(newTerm); currentTerm = newTerm; } - } catch (Exception e) { + } catch (IOError | Exception e) { assertNotNull(ExceptionsHelper.unwrap(e, IOException.class)); } @@ -478,7 +479,7 @@ Directory createDirectory(Path path) { assertEquals(state, persistedState.getLastAcceptedState()); assertEquals(currentTerm, persistedState.getCurrentTerm()); - } catch (Exception e) { + } catch (IOError | Exception e) { if (ioExceptionRate.get() == 0.0d) { throw e; } diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 582d7c69a8468..a05dc9fc560a8 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; +import java.io.IOError; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; @@ -418,6 +419,53 @@ public void sync(Collection names) { } } + public void testCrashesWithIOErrorOnCommitFailure() throws IOException { + final AtomicBoolean throwException = new AtomicBoolean(); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final PersistedClusterStateService persistedClusterStateService + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + @Override + Directory createDirectory(Path path) throws IOException { + return new FilterDirectory(super.createDirectory(path)) { + @Override + public void rename(String source, String dest) throws IOException { + if (throwException.get() && dest.startsWith("segments")) { + throw new IOException("simulated"); + } + } + }; + } + }; + + try (Writer writer = persistedClusterStateService.createWriter()) { + final ClusterState clusterState = loadPersistedClusterState(persistedClusterStateService); + final long newTerm = randomNonNegativeLong(); + final ClusterState newState = ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(UUIDs.randomBase64UUID(random())) + .clusterUUIDCommitted(true) + .version(randomLongBetween(1L, Long.MAX_VALUE))) + .incrementVersion().build(); + throwException.set(true); + assertThat(expectThrows(IOError.class, () -> { + if (randomBoolean()) { + writeState(writer, newTerm, newState, clusterState); + } else { + writer.commit(newTerm, newState.version()); + } + }).getMessage(), + containsString("simulated")); + assertFalse(writer.isOpen()); + } + + // check if we can open writer again + try (Writer ignored = persistedClusterStateService.createWriter()) { + + } + } + } + public void testFailsIfGlobalMetadataIsMissing() throws IOException { // if someone attempted surgery on the metadata index by hand, e.g. deleting broken segments, then maybe the global metadata // isn't there any more From 069730c015f3904309f961e5bf9946ec254c8394 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 10 Jan 2020 15:53:13 +0100 Subject: [PATCH 5/6] too fancy for Java --- .../elasticsearch/gateway/PersistedClusterStateService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index e97ccf68dc03e..6c1de71029d33 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -699,7 +699,8 @@ public void commit(long currentTerm, long lastAcceptedVersion) throws IOExceptio // retry from the beginning. try { close(); - } finally { + } catch (Exception e2) { + e.addSuppressed(e2); throw new IOError(e); } } finally { From 8a9c8641afa0abde44afd8cd70f46c07f4d5ca73 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 10 Jan 2020 16:05:21 +0100 Subject: [PATCH 6/6] duh --- .../org/elasticsearch/gateway/PersistedClusterStateService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 6c1de71029d33..c426052ce04a5 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -701,8 +701,8 @@ public void commit(long currentTerm, long lastAcceptedVersion) throws IOExceptio close(); } catch (Exception e2) { e.addSuppressed(e2); - throw new IOError(e); } + throw new IOError(e); } finally { closeIfAnyIndexWriterHasTragedyOrIsClosed(); }