Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2846,7 +2846,8 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " +
"flushsize=" + totalSizeOfFlushableStores;
status.setStatus(s);
doSyncOfUnflushedWALChanges(wal, getRegionInfo());

doSyncOfUnflushedWALChanges(status, wal, getRegionInfo());
return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
flushOpSeqId, flushedSeqId, totalSizeOfFlushableStores);
}
Expand Down Expand Up @@ -2903,17 +2904,27 @@ private void doAbortFlushToWAL(final WAL wal, final long flushOpSeqId,
/**
* Sync unflushed WAL changes. See HBASE-8208 for details
*/
private static void doSyncOfUnflushedWALChanges(final WAL wal, final RegionInfo hri)
throws IOException {
private void doSyncOfUnflushedWALChanges(MonitoredTask status, final WAL wal,
final RegionInfo hri) throws IOException {
if (wal == null) {
return;
}
try {
wal.sync(); // ensure that flush marker is sync'ed
} catch (IOException ioe) {
wal.abortCacheFlush(hri.getEncodedNameAsBytes());
throw ioe;
}
int retry = 0;
int maxRetry = 3;
IOException lastIOE = null;
do {
try {
wal.sync(); // ensure that flush marker is sync'ed
return;
} catch (IOException ioe) {
retry++;
LOG.warn("Sync region " + hri + " unflushed WAL changes failed, retry time " + retry, ioe);
lastIOE = ioe;
}
} while (retry < maxRetry);
status.abort("Sync region " + hri + " unflushed WAL changes failed: " + StringUtils
.stringifyException(lastIOE));
fatalForFlushCache(lastIOE);
}

/**
Expand Down Expand Up @@ -3036,22 +3047,8 @@ FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,
}
wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionInfo().getRegionName()), t);
status.abort("Flush failed: " + StringUtils.stringifyException(t));

// Callers for flushcache() should catch DroppedSnapshotException and abort the region server.
// However, since we may have the region read lock, we cannot call close(true) here since
// we cannot promote to a write lock. Instead we are setting closing so that all other region
// operations except for close will be rejected.
this.closing.set(true);

if (rsServices != null) {
// This is a safeguard against the case where the caller fails to explicitly handle aborting
rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
}

throw dse;
fatalForFlushCache(t);
}

// If we get to here, the HStores have been written.
Expand Down Expand Up @@ -3097,6 +3094,24 @@ FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
}

private void fatalForFlushCache(Throwable t) throws DroppedSnapshotException {
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionInfo().getRegionName()), t);

// Callers for flushcache() should catch DroppedSnapshotException and abort the region server.
// However, since we may have the region read lock, we cannot call close(true) here since
// we cannot promote to a write lock. Instead we are setting closing so that all other region
// operations except for close will be rejected.
this.closing.set(true);

if (rsServices != null) {
// This is a safeguard against the case where the caller fails to explicitly handle aborting
rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
}

throw dse;
}

/**
* Method to safely get the next sequence number.
* @return Next sequence number unassociated with any actual edit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1276,9 +1276,6 @@ public long getSyncedLength() {
try {
region.flush(true);
fail("This should have thrown exception");
} catch (DroppedSnapshotException unexpected) {
// this should not be a dropped snapshot exception. Meaning that RS will not abort
throw unexpected;
} catch (IOException expected) {
// expected
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,20 +535,6 @@ public void postLogRoll(Path oldFile, Path newFile) {
assertTrue(loggedRows.contains("row1004"));
assertTrue(loggedRows.contains("row1005"));

// flush all regions
for (HRegion r : server.getOnlineRegionsLocalContext()) {
try {
r.flush(true);
} catch (Exception e) {
// This try/catch was added by HBASE-14317. It is needed
// because this issue tightened up the semantic such that
// a failed append could not be followed by a successful
// sync. What is coming out here is a failed sync, a sync
// that used to 'pass'.
LOG.info(e.toString(), e);
}
}

ResultScanner scanner = table.getScanner(new Scan());
try {
for (int i = 2; i <= 5; i++) {
Expand Down