Skip to content

Commit 9ab0b15

Browse files
authored
HBASE-27211 Data race in MonitoredTaskImpl could cause split wal failure (#4630)
Signed-off-by: Guanghao Zhang <[email protected]> Signed-off-by: Xin Sun <[email protected]>
1 parent ff8eb59 commit 9ab0b15

File tree

10 files changed

+57
-98
lines changed

10 files changed

+57
-98
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,8 @@ public TakeSnapshotHandler(SnapshotDescription snapshot, final MasterServices ma
125125
// prepare the verify
126126
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, workingDirFs);
127127
// update the running tasks
128-
this.status = TaskMonitor.get()
129-
.createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
130-
this.status.enableStatusJournal(true);
128+
this.status = TaskMonitor.get().createStatus(
129+
"Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable, false, true);
131130
this.snapshotManifest =
132131
SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
133132
}

hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
4545
private Map<String, Object> callInfoMap = new HashMap<>();
4646

4747
public MonitoredRPCHandlerImpl() {
48-
super();
48+
super(false);
4949
// in this implementation, WAITING indicates that the handler is not
5050
// actively servicing an RPC call.
5151
setState(State.WAITING);
@@ -235,7 +235,7 @@ private Map<String, Object> generateCallInfoMap() {
235235
return map;
236236
}
237237
Map<String, Object> rpcJSON = new HashMap<>();
238-
ArrayList paramList = new ArrayList();
238+
ArrayList<Object> paramList = new ArrayList<>();
239239
map.put("rpcCall", rpcJSON);
240240
rpcJSON.put("queuetimems", getRPCQueueTime());
241241
rpcJSON.put("starttimems", getRPCStartTime());

hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,11 @@ public interface StatusJournalEntry {
6969

7070
void setWarnTime(final long t);
7171

72-
List<StatusJournalEntry> getStatusJournal();
73-
7472
/**
75-
* Enable journal that will store all statuses that have been set along with the time stamps when
76-
* they were set.
77-
* @param includeCurrentStatus whether to include the current set status in the journal
73+
* If journal is enabled, we will store all statuses that have been set along with the time stamps
74+
* when they were set. This method will give you all the journals stored so far.
7875
*/
79-
void enableStatusJournal(boolean includeCurrentStatus);
80-
81-
void disableStatusJournal();
76+
List<StatusJournalEntry> getStatusJournal();
8277

8378
String prettyPrintJournal();
8479

hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java

Lines changed: 25 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
package org.apache.hadoop.hbase.monitoring;
1919

2020
import java.io.IOException;
21-
import java.util.ArrayList;
2221
import java.util.Collections;
2322
import java.util.HashMap;
23+
import java.util.Iterator;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
2627
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
2728
import org.apache.hadoop.hbase.util.GsonUtil;
2829
import org.apache.yetus.audience.InterfaceAudience;
2930

31+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
3032
import org.apache.hbase.thirdparty.com.google.gson.Gson;
3133

3234
@InterfaceAudience.Private
@@ -40,22 +42,25 @@ class MonitoredTaskImpl implements MonitoredTask {
4042
private volatile String description;
4143

4244
protected volatile State state = State.RUNNING;
43-
44-
private boolean journalEnabled = false;
45-
private List<StatusJournalEntry> journal;
45+
private final ConcurrentLinkedQueue<StatusJournalEntry> journal;
4646

4747
private static final Gson GSON = GsonUtil.createGson().create();
4848

49-
public MonitoredTaskImpl() {
49+
public MonitoredTaskImpl(boolean enableJournal) {
5050
startTime = EnvironmentEdgeManager.currentTime();
5151
statusTime = startTime;
5252
stateTime = startTime;
5353
warnTime = startTime;
54+
if (enableJournal) {
55+
journal = new ConcurrentLinkedQueue<>();
56+
} else {
57+
journal = null;
58+
}
5459
}
5560

56-
private static class StatusJournalEntryImpl implements StatusJournalEntry {
57-
private long statusTime;
58-
private String status;
61+
private static final class StatusJournalEntryImpl implements StatusJournalEntry {
62+
private final long statusTime;
63+
private final String status;
5964

6065
public StatusJournalEntryImpl(String status, long statusTime) {
6166
this.status = status;
@@ -74,11 +79,7 @@ public long getTimeStamp() {
7479

7580
@Override
7681
public String toString() {
77-
StringBuilder sb = new StringBuilder();
78-
sb.append(status);
79-
sb.append(" at ");
80-
sb.append(statusTime);
81-
return sb.toString();
82+
return status + " at " + statusTime;
8283
}
8384
}
8485

@@ -162,7 +163,7 @@ public void abort(String msg) {
162163
public void setStatus(String status) {
163164
this.status = status;
164165
statusTime = EnvironmentEdgeManager.currentTime();
165-
if (journalEnabled) {
166+
if (journal != null) {
166167
journal.add(new StatusJournalEntryImpl(this.status, statusTime));
167168
}
168169
}
@@ -240,52 +241,29 @@ public List<StatusJournalEntry> getStatusJournal() {
240241
if (journal == null) {
241242
return Collections.emptyList();
242243
} else {
243-
return Collections.unmodifiableList(journal);
244+
return ImmutableList.copyOf(journal);
244245
}
245246
}
246247

247-
/**
248-
* Enables journaling of this monitored task, the first invocation will lazily initialize the
249-
* journal. The journal implementation itself and this method are not thread safe
250-
*/
251-
@Override
252-
public void enableStatusJournal(boolean includeCurrentStatus) {
253-
if (journalEnabled && journal != null) {
254-
return;
255-
}
256-
journalEnabled = true;
257-
if (journal == null) {
258-
journal = new ArrayList<StatusJournalEntry>();
259-
}
260-
if (includeCurrentStatus && status != null) {
261-
journal.add(new StatusJournalEntryImpl(status, statusTime));
262-
}
263-
}
264-
265-
@Override
266-
public void disableStatusJournal() {
267-
journalEnabled = false;
268-
}
269-
270248
@Override
271249
public String prettyPrintJournal() {
272-
if (!journalEnabled) {
250+
if (journal == null) {
273251
return "";
274252
}
275253
StringBuilder sb = new StringBuilder();
276-
for (int i = 0; i < journal.size(); i++) {
277-
StatusJournalEntry je = journal.get(i);
278-
sb.append(je.toString());
279-
if (i != 0) {
280-
StatusJournalEntry jep = journal.get(i - 1);
281-
long delta = je.getTimeStamp() - jep.getTimeStamp();
254+
Iterator<StatusJournalEntry> iter = journal.iterator();
255+
StatusJournalEntry previousEntry = null;
256+
while (iter.hasNext()) {
257+
StatusJournalEntry entry = iter.next();
258+
sb.append(entry);
259+
if (previousEntry != null) {
260+
long delta = entry.getTimeStamp() - previousEntry.getTimeStamp();
282261
if (delta != 0) {
283262
sb.append(" (+" + delta + " ms)");
284263
}
285264
}
286-
sb.append("\n");
265+
previousEntry = entry;
287266
}
288267
return sb.toString();
289268
}
290-
291269
}

hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class TaskMonitor {
5858
private final int maxTasks;
5959
private final long rpcWarnTime;
6060
private final long expirationTime;
61-
private final CircularFifoQueue tasks;
61+
private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
6262
private final List<TaskAndWeakRefPair> rpcTasks;
6363
private final long monitorInterval;
6464
private Thread monitorThread;
@@ -67,7 +67,7 @@ public class TaskMonitor {
6767
maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
6868
expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
6969
rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
70-
tasks = new CircularFifoQueue(maxTasks);
70+
tasks = new CircularFifoQueue<>(maxTasks);
7171
rpcTasks = Lists.newArrayList();
7272
monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
7373
monitorThread = new Thread(new MonitorRunnable());
@@ -84,12 +84,17 @@ public static synchronized TaskMonitor get() {
8484
return instance;
8585
}
8686

87-
public synchronized MonitoredTask createStatus(String description) {
87+
public MonitoredTask createStatus(String description) {
8888
return createStatus(description, false);
8989
}
9090

91-
public synchronized MonitoredTask createStatus(String description, boolean ignore) {
92-
MonitoredTask stat = new MonitoredTaskImpl();
91+
public MonitoredTask createStatus(String description, boolean ignore) {
92+
return createStatus(description, ignore, false);
93+
}
94+
95+
public synchronized MonitoredTask createStatus(String description, boolean ignore,
96+
boolean enableJournal) {
97+
MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
9398
stat.setDescription(description);
9499
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
95100
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,6 @@ public long initialize() throws IOException {
931931
* Initialize this region.
932932
* @param reporter Tickle every so often if initialize is taking a while.
933933
* @return What the next sequence (edit) id should be.
934-
* @throws IOException e
935934
*/
936935
long initialize(final CancelableProgressable reporter) throws IOException {
937936

@@ -941,8 +940,8 @@ long initialize(final CancelableProgressable reporter) throws IOException {
941940
+ " should have at least one column family.");
942941
}
943942

944-
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
945-
status.enableStatusJournal(true);
943+
MonitoredTask status =
944+
TaskMonitor.get().createStatus("Initializing region " + this, false, true);
946945
long nextSeqId = -1;
947946
try {
948947
nextSeqId = initializeRegionInternals(reporter, status);
@@ -1596,8 +1595,7 @@ public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus)
15961595
// threads attempting to close will run up against each other.
15971596
MonitoredTask status = TaskMonitor.get().createStatus(
15981597
"Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""),
1599-
ignoreStatus);
1600-
status.enableStatusJournal(true);
1598+
ignoreStatus, true);
16011599
status.setStatus("Waiting for close lock");
16021600
try {
16031601
synchronized (closeLock) {
@@ -2318,7 +2316,6 @@ public boolean compact(CompactionContext compaction, HStore store,
23182316
}
23192317

23202318
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
2321-
status.enableStatusJournal(false);
23222319
if (this.closed.get()) {
23232320
String msg = "Skipping compaction on " + this + " because closed";
23242321
LOG.debug(msg);
@@ -2455,7 +2452,6 @@ public FlushResultImpl flushcache(List<byte[]> families, boolean writeFlushReque
24552452
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
24562453
}
24572454
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
2458-
status.enableStatusJournal(false);
24592455
status.setStatus("Acquiring readlock on region");
24602456
// block waiting for the lock for flushing cache
24612457
lock.readLock().lock();

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,8 @@ SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) thr
287287
boolean cancelled = false;
288288
int editsCount = 0;
289289
int editsSkipped = 0;
290-
MonitoredTask status =
291-
TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.");
292-
status.enableStatusJournal(true);
290+
MonitoredTask status = TaskMonitor.get()
291+
.createStatus("Splitting " + wal + " to temporary staging area.", false, true);
293292
Reader walReader = null;
294293
this.fileBeingSplit = walStatus;
295294
long startTS = EnvironmentEdgeManager.currentTime();

hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,23 +212,17 @@ public void testStatusJournal() {
212212
TaskMonitor tm = new TaskMonitor(new Configuration());
213213
MonitoredTask task = tm.createStatus("Test task");
214214
assertTrue(task.getStatusJournal().isEmpty());
215-
task.disableStatusJournal();
216215
task.setStatus("status1");
217216
// journal should be empty since it is disabled
218217
assertTrue(task.getStatusJournal().isEmpty());
219-
task.enableStatusJournal(true);
220-
// check existing status entered in journal
221-
assertEquals("status1", task.getStatusJournal().get(0).getStatus());
222-
assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
223-
task.disableStatusJournal();
218+
task = tm.createStatus("Test task with journal", false, true);
224219
task.setStatus("status2");
225-
// check status 2 not added since disabled
226-
assertEquals(1, task.getStatusJournal().size());
227-
task.enableStatusJournal(false);
228-
// size should still be 1 since we didn't include current status
229220
assertEquals(1, task.getStatusJournal().size());
221+
assertEquals("status2", task.getStatusJournal().get(0).getStatus());
230222
task.setStatus("status3");
223+
assertEquals(2, task.getStatusJournal().size());
231224
assertEquals("status3", task.getStatusJournal().get(1).getStatus());
225+
task.prettyPrintJournal();
232226
tm.shutdown();
233227
}
234228

hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.junit.Assert.assertFalse;
2222
import static org.junit.Assert.assertTrue;
2323
import static org.junit.Assert.fail;
24+
import static org.junit.Assume.assumeFalse;
2425

2526
import java.io.FileNotFoundException;
2627
import java.io.IOException;
@@ -945,6 +946,9 @@ public void testThreading() throws Exception {
945946
*/
946947
@Test
947948
public void testThreadingSlowWriterSmallBuffer() throws Exception {
949+
// The logic of this test has conflict with the limit writers split logic, skip this test for
950+
// TestWALSplitBoundedLogWriterCreation
951+
assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
948952
doTestThreading(200, 1024, 50);
949953
}
950954

hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919

2020
import org.apache.hadoop.hbase.HBaseClassTestRule;
2121
import org.apache.hadoop.hbase.testclassification.LargeTests;
22+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
2223
import org.junit.BeforeClass;
2324
import org.junit.ClassRule;
24-
import org.junit.Ignore;
25-
import org.junit.Test;
2625
import org.junit.experimental.categories.Category;
2726

28-
@Category(LargeTests.class)
27+
@Category({ RegionServerTests.class, LargeTests.class })
2928
public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit {
3029

3130
@ClassRule
@@ -37,14 +36,4 @@ public static void setUpBeforeClass() throws Exception {
3736
TestWALSplit.setUpBeforeClass();
3837
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
3938
}
40-
41-
/**
42-
* The logic of this test has conflict with the limit writers split logic, skip this test
43-
*/
44-
@Override
45-
@Test
46-
@Ignore
47-
public void testThreadingSlowWriterSmallBuffer() throws Exception {
48-
super.testThreadingSlowWriterSmallBuffer();
49-
}
5039
}

0 commit comments

Comments
 (0)