From 68bb692b1b630d07dcb5587c2d9bfa792fe08841 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Thu, 17 Nov 2022 23:16:12 -0800 Subject: [PATCH 1/5] HDFS-16847: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. --- .../store/driver/impl/StateStoreFileBaseImpl.java | 2 +- .../driver/impl/StateStoreFileSystemImpl.java | 15 +++------------ 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index 871919594f57d..c4425e15ada40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -366,7 +366,7 @@ public boolean putAll( } } // Commit - if (!rename(recordPathTemp, recordPath)) { + if (success && !rename(recordPathTemp, recordPath)) { LOG.error("Failed committing record into {}", recordPath); success = false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java index e6bf159e2f597..c0b6f783b2ca3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -32,9 +32,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; @@ -82,17 +82,8 @@ protected boolean mkdir(String path) { @Override protected boolean rename(String src, String dst) { try { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem)fs; - dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE); - return true; - } else { - // Replace should be atomic but not available - if (fs.exists(new Path(dst))) { - fs.delete(new Path(dst), true); - } - return fs.rename(new Path(src), new Path(dst)); - } + FileUtil.rename(fs, new Path(src), new Path(dst), Options.Rename.OVERWRITE); + return true; } catch (Exception e) { LOG.error("Cannot rename {} to {}", src, dst, e); return false; From 8cd681835b248bf77fc40bbf8315b51d4c2ed4ba Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Mon, 21 Nov 2022 12:22:14 -0500 Subject: [PATCH 2/5] Adding unit test --- .../driver/impl/StateStoreFileBaseImpl.java | 11 ++++++++- .../driver/TestStateStoreDriverBase.java | 19 +++++++++++++++ .../driver/TestStateStoreFileSystem.java | 24 +++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index c4425e15ada40..fd02ae76897bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -88,6 +88,15 @@ protected abstract BufferedReader getReader( protected abstract BufferedWriter getWriter( String path); + /** + * Convenience method to allow to mocking of protected + * {@link #getWriter(String)} + */ + @VisibleForTesting + public BufferedWriter getBufferedWriter(String path) { + return getWriter(path); + } + /** * Check if a path exists. * @@ -348,7 +357,7 @@ public boolean putAll( for (Entry entry : toWrite.entrySet()) { String recordPath = entry.getKey(); String recordPathTemp = recordPath + "." + now() + TMP_MARK; - BufferedWriter writer = getWriter(recordPathTemp); + BufferedWriter writer = getBufferedWriter(recordPathTemp); try { T record = entry.getValue(); String line = serializeString(record); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index b8bb7c4d2d115..5ad01dce8e729 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -234,6 +234,25 @@ public void testInsert( assertEquals(11, records2.size()); } + public void testInsertWithErrorDuringWrite( + StateStoreDriver driver, Class recordClass) + throws IllegalArgumentException, IllegalAccessException, IOException { + + assertTrue(driver.removeAll(recordClass)); + QueryResult queryResult0 = driver.get(recordClass); + List records0 = queryResult0.getRecords(); + assertTrue(records0.isEmpty()); + + // Insert single + BaseRecord record = generateFakeRecord(recordClass); + driver.put(record, true, false); + + // Verify that no record was inserted. + QueryResult queryResult1 = driver.get(recordClass); + List records1 = queryResult1.getRecords(); + assertEquals(0, records1.size()); + } + public void testFetchErrors(StateStoreDriver driver, Class clazz) throws IllegalAccessException, IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java index 8c4b188cc47e3..4dce2a4e39463 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java @@ -17,16 +17,26 @@ */ package org.apache.hadoop.hdfs.server.federation.store.driver; +import java.io.BufferedWriter; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + /** * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. @@ -91,4 +101,18 @@ public void testMetrics() throws IllegalArgumentException, IllegalAccessException, IOException { testMetrics(getStateStoreDriver()); } + + @Test + public void testInsertWithErrorDuringWrite() + throws IllegalArgumentException, IllegalAccessException, IOException { + StateStoreFileBaseImpl driver = spy((StateStoreFileBaseImpl)getStateStoreDriver()); + doAnswer((Answer) a -> { + BufferedWriter writer = (BufferedWriter) a.callRealMethod(); + BufferedWriter spyWriter = spy(writer); + doThrow(IOException.class).when(spyWriter).write(any(String.class)); + return spyWriter; + }).when(driver).getBufferedWriter(any()); + + testInsertWithErrorDuringWrite(driver, MembershipState.class); + } } \ No newline at end of file From 4731c710d46e33e97180b05c95048d3656617789 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Mon, 21 Nov 2022 13:24:40 -0500 Subject: [PATCH 3/5] Scoping skipping of renames to each record. --- .../federation/store/driver/impl/StateStoreFileBaseImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index fd02ae76897bc..f1feeb8d1989a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -358,6 +358,7 @@ public boolean putAll( String recordPath = entry.getKey(); String recordPathTemp = recordPath + "." + now() + TMP_MARK; BufferedWriter writer = getBufferedWriter(recordPathTemp); + boolean recordWrittenSuccessfully = true; try { T record = entry.getValue(); String line = serializeString(record); @@ -365,17 +366,19 @@ public boolean putAll( } catch (IOException e) { LOG.error("Cannot write {}", recordPathTemp, e); success = false; + recordWrittenSuccessfully = false; } finally { if (writer != null) { try { writer.close(); } catch (IOException e) { + recordWrittenSuccessfully = false; LOG.error("Cannot close the writer for {}", recordPathTemp, e); } } } // Commit - if (success && !rename(recordPathTemp, recordPath)) { + if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) { LOG.error("Failed committing record into {}", recordPath); success = false; } From 5c7709c6df86622812edd1ba32a0af4304eef0e2 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Mon, 21 Nov 2022 20:57:39 -0500 Subject: [PATCH 4/5] Simplifying code --- .../driver/impl/StateStoreFileBaseImpl.java | 18 +++++------------- .../store/driver/impl/StateStoreFileImpl.java | 4 +++- .../driver/impl/StateStoreFileSystemImpl.java | 4 +++- .../store/driver/TestStateStoreFileSystem.java | 2 +- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index f1feeb8d1989a..afe5a669577c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -85,17 +85,9 @@ protected abstract BufferedReader getReader( * @param path Path of the record to write. * @return Writer for the record. */ - protected abstract BufferedWriter getWriter( - String path); - - /** - * Convenience method to allow to mocking of protected - * {@link #getWriter(String)} - */ @VisibleForTesting - public BufferedWriter getBufferedWriter(String path) { - return getWriter(path); - } + public abstract BufferedWriter getWriter( + String path); /** * Check if a path exists. @@ -357,16 +349,16 @@ public boolean putAll( for (Entry entry : toWrite.entrySet()) { String recordPath = entry.getKey(); String recordPathTemp = recordPath + "." + now() + TMP_MARK; - BufferedWriter writer = getBufferedWriter(recordPathTemp); - boolean recordWrittenSuccessfully = true; + BufferedWriter writer = getWriter(recordPathTemp); + boolean recordWrittenSuccessfully = false; try { T record = entry.getValue(); String line = serializeString(record); writer.write(line); + recordWrittenSuccessfully = true; } catch (IOException e) { LOG.error("Cannot write {}", recordPathTemp, e); success = false; - recordWrittenSuccessfully = false; } finally { if (writer != null) { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java index 9d2b1ab2fb73a..6ca2663716162 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -31,6 +31,7 @@ import java.util.List; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.slf4j.Logger; @@ -125,7 +126,8 @@ protected BufferedReader getReader(String filename) { } @Override - protected BufferedWriter getWriter(String filename) { + @VisibleForTesting + public BufferedWriter getWriter(String filename) { BufferedWriter writer = null; try { LOG.debug("Writing file: {}", filename); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java index c0b6f783b2ca3..ee34d8a4cabbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -139,7 +140,8 @@ protected BufferedReader getReader(String pathName) { } @Override - protected BufferedWriter getWriter(String pathName) { + @VisibleForTesting + public BufferedWriter getWriter(String pathName) { BufferedWriter writer = null; Path path = new Path(pathName); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java index 4dce2a4e39463..dbd4b9bdae2ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java @@ -111,7 +111,7 @@ public void testInsertWithErrorDuringWrite() BufferedWriter spyWriter = spy(writer); doThrow(IOException.class).when(spyWriter).write(any(String.class)); return spyWriter; - }).when(driver).getBufferedWriter(any()); + }).when(driver).getWriter(any()); testInsertWithErrorDuringWrite(driver, MembershipState.class); } From 49c60394726b8a9d417e9a104cead11b77488367 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Tue, 22 Nov 2022 14:57:34 -0500 Subject: [PATCH 5/5] Switching to try with resources --- .../driver/impl/StateStoreFileBaseImpl.java | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index afe5a669577c3..c93d919aea0a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -349,25 +349,15 @@ public boolean putAll( for (Entry entry : toWrite.entrySet()) { String recordPath = entry.getKey(); String recordPathTemp = recordPath + "." + now() + TMP_MARK; - BufferedWriter writer = getWriter(recordPathTemp); - boolean recordWrittenSuccessfully = false; - try { + boolean recordWrittenSuccessfully = true; + try (BufferedWriter writer = getWriter(recordPathTemp)) { T record = entry.getValue(); String line = serializeString(record); writer.write(line); - recordWrittenSuccessfully = true; } catch (IOException e) { LOG.error("Cannot write {}", recordPathTemp, e); + recordWrittenSuccessfully = false; success = false; - } finally { - if (writer != null) { - try { - writer.close(); - } catch (IOException e) { - recordWrittenSuccessfully = false; - LOG.error("Cannot close the writer for {}", recordPathTemp, e); - } - } } // Commit if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {