Skip to content

Commit ec747bc

Browse files
authored
HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)
Signed-off-by: Andrew Purtell <[email protected]> Signed-off-by: Duo Zhang <[email protected]>
1 parent f022692 commit ec747bc

File tree

6 files changed

+39
-47
lines changed

6 files changed

+39
-47
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
217217
}
218218
return res;
219219
} catch (IOException e) {
220-
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
221-
if (logFile != archivedLog) {
220+
Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
221+
// archivedLog can be null if unable to locate in archiveDir.
222+
if (archivedLog != null) {
222223
openReader(archivedLog);
223224
// Try call again in recursion
224225
return nextKeyValue();

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNotEquals;
2223
import static org.junit.Assert.assertTrue;
24+
import java.io.IOException;
2325
import java.util.List;
2426
import java.util.NavigableMap;
2527
import java.util.TreeMap;
@@ -31,6 +33,7 @@
3133
import org.apache.hadoop.hbase.HBaseTestingUtil;
3234
import org.apache.hadoop.hbase.HConstants;
3335
import org.apache.hadoop.hbase.KeyValue;
36+
import org.apache.hadoop.hbase.ServerName;
3437
import org.apache.hadoop.hbase.TableName;
3538
import org.apache.hadoop.hbase.client.RegionInfo;
3639
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -92,6 +95,11 @@ private static String getName() {
9295
return "TestWALRecordReader";
9396
}
9497

98+
private static String getServerName() {
99+
ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
100+
return serverName.toString();
101+
}
102+
95103
@Before
96104
public void setUp() throws Exception {
97105
fs.delete(hbaseDir, true);
@@ -282,7 +290,6 @@ public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
282290
LOG.debug("log="+logDir+" file="+ split.getLogFileName());
283291

284292
testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
285-
286293
}
287294

288295
protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
@@ -335,13 +342,16 @@ private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2)
335342
// Move log file to archive directory
336343
// While WAL record reader is open
337344
WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
338-
339345
Path logFile = new Path(split_.getLogFileName());
340-
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
341-
boolean result = fs.rename(logFile, archivedLog);
342-
assertTrue(result);
343-
result = fs.exists(archivedLog);
344-
assertTrue(result);
346+
Path archivedLogDir = getWALArchiveDir(conf);
347+
Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
348+
assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
349+
350+
assertTrue(fs.rename(logFile, archivedLogLocation));
351+
assertTrue(fs.exists(archivedLogDir));
352+
assertFalse(fs.exists(logFile));
353+
// TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
354+
// TODO: the archivedLogLocation to read next key value.
345355
assertTrue(reader.nextKeyValue());
346356
cell = reader.getCurrentValue().getCells().get(0);
347357
if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
@@ -353,4 +363,10 @@ private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2)
353363
}
354364
reader.close();
355365
}
366+
367+
private Path getWALArchiveDir(Configuration conf) throws IOException {
368+
Path rootDir = CommonFSUtils.getWALRootDir(conf);
369+
String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
370+
return new Path(rootDir, archiveDir);
371+
}
356372
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.replication.regionserver;
1919

20-
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
20+
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
2121
import java.io.FileNotFoundException;
2222
import java.io.IOException;
2323
import java.lang.reflect.InvocationTargetException;
@@ -396,8 +396,12 @@ private long getFileSize(Path currentPath) throws IOException {
396396
try {
397397
fileSize = fs.getContentSummary(currentPath).getLength();
398398
} catch (FileNotFoundException e) {
399-
currentPath = getArchivedLogPath(currentPath, conf);
400-
fileSize = fs.getContentSummary(currentPath).getLength();
399+
Path archivedLogPath = findArchivedLog(currentPath, conf);
400+
// archivedLogPath can be null if unable to locate in archiveDir.
401+
if (archivedLogPath == null) {
402+
throw new FileNotFoundException("Couldn't find path: " + currentPath);
403+
}
404+
fileSize = fs.getContentSummary(archivedLogPath).getLength();
401405
}
402406
return fileSize;
403407
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) {
280280
if (!fs.exists(path)) {
281281
// There is a chance that wal has moved to oldWALs directory, so look there also.
282282
path = AbstractFSWALProvider.findArchivedLog(path, conf);
283-
// path is null if it couldn't find archive path.
283+
// path can be null if unable to locate in archiveDir.
284284
}
285285
if (path != null && fs.getFileStatus(path).getLen() == 0) {
286286
LOG.warn("Forcing removal of 0 length log in queue: {}", path);

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ private boolean openNextLog() throws IOException {
319319
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
320320
// If the log was archived, continue reading from there
321321
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
322+
// archivedLog can be null if unable to locate in archiveDir.
322323
if (archivedLog != null) {
323324
openReader(archivedLog);
324325
} else {
@@ -384,6 +385,7 @@ private void resetReader() throws IOException {
384385
} catch (FileNotFoundException fnfe) {
385386
// If the log was archived, continue reading from there
386387
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
388+
// archivedLog can be null if unable to locate in archiveDir.
387389
if (archivedLog != null) {
388390
openReader(archivedLog);
389391
} else {

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Collections;
2424
import java.util.Comparator;
2525
import java.util.List;
26-
import java.util.Objects;
2726
import java.util.concurrent.atomic.AtomicBoolean;
2827
import java.util.concurrent.locks.ReadWriteLock;
2928
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -469,36 +468,6 @@ public static boolean isArchivedLogFile(Path p) {
469468
return p.toString().contains(oldLog);
470469
}
471470

472-
/**
473-
* Get the archived WAL file path
474-
* @param path - active WAL file path
475-
* @param conf - configuration
476-
* @return archived path if exists, path - otherwise
477-
* @throws IOException exception
478-
*/
479-
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
480-
Path rootDir = CommonFSUtils.getWALRootDir(conf);
481-
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
482-
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
483-
ServerName serverName = getServerNameFromWALDirectoryName(path);
484-
if (serverName == null) {
485-
LOG.error("Couldn't locate log: " + path);
486-
return path;
487-
}
488-
oldLogDir = new Path(oldLogDir, serverName.getServerName());
489-
}
490-
Path archivedLogLocation = new Path(oldLogDir, path.getName());
491-
final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
492-
493-
if (fs.exists(archivedLogLocation)) {
494-
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
495-
return archivedLogLocation;
496-
} else {
497-
LOG.error("Couldn't locate log: " + path);
498-
return path;
499-
}
500-
}
501-
502471
/**
503472
* Find the archived WAL file path if it is not able to locate in WALs dir.
504473
* @param path - active WAL file path
@@ -531,7 +500,6 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
531500
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
532501
return archivedLogLocation;
533502
}
534-
535503
LOG.error("Couldn't locate log: " + path);
536504
return null;
537505
}
@@ -557,8 +525,9 @@ public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Confi
557525
return reader;
558526
} catch (FileNotFoundException fnfe) {
559527
// If the log was archived, continue reading from there
560-
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
561-
if (!Objects.equals(path, archivedLog)) {
528+
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
529+
// archivedLog can be null if unable to locate in archiveDir.
530+
if (archivedLog != null) {
562531
return openReader(archivedLog, conf);
563532
} else {
564533
throw fnfe;

0 commit comments

Comments
 (0)