diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java index e463591f5185..89edccc22538 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java @@ -73,33 +73,30 @@ protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, b return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); } - protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, - List thrown) throws IOException { + /** + * abortRecoveredEditsWriter closes the editsWriter, but does not rename and finalize the + * recovered edits WAL files. Please see HBASE-28569. + */ + protected void abortRecoveredEditsWriter(RecoveredEditsWriter editsWriter, + List thrown) { + closeRecoveredEditsWriter(editsWriter, thrown); try { - editsWriter.writer.close(); + removeRecoveredEditsFile(editsWriter); } catch (IOException ioe) { - final String errorMsg = "Could not close recovered edits at " + editsWriter.path; - LOG.error(errorMsg, ioe); + final String errorMsg = "Failed removing recovered edits file at " + editsWriter.path; + LOG.error(errorMsg); updateStatusWithMsg(errorMsg); - thrown.add(ioe); + } + } + + protected Path closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter editsWriter, + List thrown) throws IOException { + if (!closeRecoveredEditsWriter(editsWriter, thrown)) { return null; } - final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " - + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " - + (editsWriter.nanosSpent / 1000 / 1000) + " ms)"; - LOG.info(msg); - updateStatusWithMsg(msg); if (editsWriter.editsWritten == 0) { // just remove the empty recovered.edits file - if ( - walSplitter.walFS.exists(editsWriter.path) - && !walSplitter.walFS.delete(editsWriter.path, false) - ) { - final String errorMsg = "Failed deleting empty " + editsWriter.path; - LOG.warn(errorMsg); - updateStatusWithMsg(errorMsg); - throw new IOException("Failed deleting empty " + editsWriter.path); - } + removeRecoveredEditsFile(editsWriter); return null; } @@ -133,6 +130,37 @@ protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, return dst; } + private boolean closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, + List thrown) { + try { + editsWriter.writer.close(); + } catch (IOException ioe) { + final String errorMsg = "Could not close recovered edits at " + editsWriter.path; + LOG.error(errorMsg, ioe); + updateStatusWithMsg(errorMsg); + thrown.add(ioe); + return false; + } + final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " + + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + + (editsWriter.nanosSpent / 1000 / 1000) + " ms)"; + LOG.info(msg); + updateStatusWithMsg(msg); + return true; + } + + private void removeRecoveredEditsFile(RecoveredEditsWriter editsWriter) throws IOException { + if ( + walSplitter.walFS.exists(editsWriter.path) + && !walSplitter.walFS.delete(editsWriter.path, false) + ) { + final String errorMsg = "Failed deleting empty " + editsWriter.path; + LOG.warn(errorMsg); + updateStatusWithMsg(errorMsg); + throw new IOException("Failed deleting empty " + editsWriter.path); + } + } + @Override public boolean keepRegionEvent(WAL.Entry entry) { ArrayList cells = entry.getEdit().getCells(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java index 150521b7957f..e0a9e40e4c93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java @@ -70,7 +70,7 @@ public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten); List thrown = new ArrayList<>(); - Path dst = closeRecoveredEditsWriter(writer, thrown); + Path dst = closeRecoveredEditsWriterAndFinalizeEdits(writer, thrown); splits.add(dst); openingWritersNum.decrementAndGet(); if (!thrown.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java index d8cf1371c341..e5cc3bbaf730 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java @@ -89,24 +89,40 @@ private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] @Override public List close() throws IOException { - boolean isSuccessful = true; + boolean isSuccessful; try { isSuccessful = finishWriterThreads(); - } finally { - isSuccessful &= closeWriters(); + } catch (IOException e) { + closeWriters(false); + throw e; + } + if (!isSuccessful) { + // Even if an exception is not thrown, finishWriterThreads() not being successful is an + // error case where the WAL files should not be finalized. + closeWriters(false); + return null; } + isSuccessful = closeWriters(true); return isSuccessful ? splits : null; } /** - * Close all of the output streams. + * Close all the output streams. + * @param finalizeEdits true in the successful close case, false when we don't want to rename and + * finalize the temporary, possibly corrupted WAL files, such as when there + * was a previous failure or exception. Please see HBASE-28569. * @return true when there is no error. */ - private boolean closeWriters() throws IOException { + boolean closeWriters(boolean finalizeEdits) throws IOException { List thrown = Lists.newArrayList(); for (RecoveredEditsWriter writer : writers.values()) { closeCompletionService.submit(() -> { - Path dst = closeRecoveredEditsWriter(writer, thrown); + if (!finalizeEdits) { + abortRecoveredEditsWriter(writer, thrown); + LOG.trace("Aborted edits at {}", writer.path); + return null; + } + Path dst = closeRecoveredEditsWriterAndFinalizeEdits(writer, thrown); LOG.trace("Closed {}", dst); splits.add(dst); return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRecoveredEditsOutputSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRecoveredEditsOutputSink.java new file mode 100644 index 000000000000..06ed79a04a72 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRecoveredEditsOutputSink.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.io.InterruptedIOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRecoveredEditsOutputSink { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRecoveredEditsOutputSink.class); + + private static WALFactory wals; + private static FileSystem fs; + private static Path rootDir; + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private static RecoveredEditsOutputSink outputSink; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "filesystem"); + rootDir = TEST_UTIL.createRootDir(); + fs = CommonFSUtils.getRootDirFileSystem(conf); + wals = new WALFactory(conf, "testRecoveredEditsOutputSinkWALFactory"); + WALSplitter splitter = new WALSplitter(wals, conf, rootDir, fs, rootDir, fs); + WALSplitter.PipelineController pipelineController = new WALSplitter.PipelineController(); + EntryBuffers sink = new EntryBuffers(pipelineController, 1024 * 1024); + outputSink = new RecoveredEditsOutputSink(splitter, pipelineController, sink, 3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + wals.close(); + fs.delete(rootDir, true); + } + + @Test + public void testCloseSuccess() throws IOException { + RecoveredEditsOutputSink spyOutputSink = Mockito.spy(outputSink); + spyOutputSink.close(); + Mockito.verify(spyOutputSink, Mockito.times(1)).finishWriterThreads(); + Mockito.verify(spyOutputSink, Mockito.times(1)).closeWriters(true); + } + + /** + * When a WAL split is interrupted (ex. by a RegionServer abort), the thread join in + * finishWriterThreads() will get interrupted, rethrowing the exception without stopping the + * writer threads. Test to ensure that when this happens, RecoveredEditsOutputSink.close() does + * not rename the recoveredEdits WAL files as this can cause corruption. Please see HBASE-28569. + * However, the writers must still be closed. + */ + @Test + public void testCloseWALSplitInterrupted() throws IOException { + RecoveredEditsOutputSink spyOutputSink = Mockito.spy(outputSink); + // The race condition will lead to an InterruptedException to be caught by finishWriterThreads() + // which is then rethrown as an InterruptedIOException. + Mockito.doThrow(new InterruptedIOException()).when(spyOutputSink).finishWriterThreads(); + assertThrows(InterruptedIOException.class, spyOutputSink::close); + Mockito.verify(spyOutputSink, Mockito.times(1)).finishWriterThreads(); + Mockito.verify(spyOutputSink, Mockito.times(1)).closeWriters(false); + } + + /** + * When finishWriterThreads fails but does not throw an exception, ensure the writers are handled + * like in the exception case - the writers are closed but the recoveredEdits WAL files are not + * renamed. + */ + @Test + public void testCloseWALFinishWriterThreadsFailed() throws IOException { + RecoveredEditsOutputSink spyOutputSink = Mockito.spy(outputSink); + Mockito.doReturn(false).when(spyOutputSink).finishWriterThreads(); + spyOutputSink.close(); + Mockito.verify(spyOutputSink, Mockito.times(1)).finishWriterThreads(); + Mockito.verify(spyOutputSink, Mockito.times(1)).closeWriters(false); + } +}