Skip to content

Commit d0ca88c

Browse files
Brandon Licnauroth
authored andcommitted
HDFS-7387. NFS may only do partial commit due to a race between COMMIT and write. Contributed by Brandon Li
(cherry picked from commit 99d9d0c) Conflicts: hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (cherry picked from commit 0c3b788) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
1 parent 1535e72 commit d0ca88c

File tree

2 files changed

+144
-52
lines changed

2 files changed

+144
-52
lines changed

hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java

Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,42 @@ public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
818818
return ret;
819819
}
820820

821+
// Check if the to-commit range is sequential
822+
@VisibleForTesting
823+
synchronized boolean checkSequential(final long commitOffset,
824+
final long nextOffset) {
825+
Preconditions.checkState(commitOffset >= nextOffset, "commitOffset "
826+
+ commitOffset + " less than nextOffset " + nextOffset);
827+
long offset = nextOffset;
828+
Iterator<OffsetRange> it = pendingWrites.descendingKeySet().iterator();
829+
while (it.hasNext()) {
830+
OffsetRange range = it.next();
831+
if (range.getMin() != offset) {
832+
// got a hole
833+
return false;
834+
}
835+
offset = range.getMax();
836+
if (offset > commitOffset) {
837+
return true;
838+
}
839+
}
840+
// there is gap between the last pending write and commitOffset
841+
return false;
842+
}
843+
844+
private COMMIT_STATUS handleSpecialWait(boolean fromRead, long commitOffset,
845+
Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
846+
if (!fromRead) {
847+
// let client retry the same request, add pending commit to sync later
848+
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr);
849+
pendingCommits.put(commitOffset, commitCtx);
850+
}
851+
if (LOG.isDebugEnabled()) {
852+
LOG.debug("return COMMIT_SPECIAL_WAIT");
853+
}
854+
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
855+
}
856+
821857
@VisibleForTesting
822858
synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
823859
Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
@@ -829,11 +865,6 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
829865
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
830866
}
831867
}
832-
if (pendingWrites.isEmpty()) {
833-
// Note that, there is no guarantee data is synced. Caller should still
834-
// do a sync here though the output stream might be closed.
835-
return COMMIT_STATUS.COMMIT_FINISHED;
836-
}
837868

838869
long flushed = 0;
839870
try {
@@ -842,10 +873,33 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
842873
LOG.error("Can't get flushed offset, error:" + e);
843874
return COMMIT_STATUS.COMMIT_ERROR;
844875
}
876+
845877
if (LOG.isDebugEnabled()) {
846-
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
878+
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset
879+
+ "nextOffset=" + nextOffset.get());
847880
}
848-
881+
882+
if (pendingWrites.isEmpty()) {
883+
if (aixCompatMode) {
884+
// Note that, there is no guarantee data is synced. Caller should still
885+
// do a sync here though the output stream might be closed.
886+
return COMMIT_STATUS.COMMIT_FINISHED;
887+
} else {
888+
if (flushed < nextOffset.get()) {
889+
if (LOG.isDebugEnabled()) {
890+
LOG.debug("get commit while still writing to the requested offset,"
891+
+ " with empty queue");
892+
}
893+
return handleSpecialWait(fromRead, nextOffset.get(), channel, xid,
894+
preOpAttr);
895+
} else {
896+
return COMMIT_STATUS.COMMIT_FINISHED;
897+
}
898+
}
899+
}
900+
901+
Preconditions.checkState(flushed <= nextOffset.get(), "flushed " + flushed
902+
+ " is larger than nextOffset " + nextOffset.get());
849903
// Handle large file upload
850904
if (uploadLargeFile && !aixCompatMode) {
851905
long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
@@ -854,21 +908,20 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
854908
if (co <= flushed) {
855909
return COMMIT_STATUS.COMMIT_DO_SYNC;
856910
} else if (co < nextOffset.get()) {
857-
if (!fromRead) {
858-
// let client retry the same request, add pending commit to sync later
859-
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
860-
preOpAttr);
861-
pendingCommits.put(commitOffset, commitCtx);
862-
}
863911
if (LOG.isDebugEnabled()) {
864-
LOG.debug("return COMMIT_SPECIAL_WAIT");
912+
LOG.debug("get commit while still writing to the requested offset");
865913
}
866-
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
914+
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
867915
} else {
868-
if (LOG.isDebugEnabled()) {
869-
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
916+
// co >= nextOffset
917+
if (checkSequential(co, nextOffset.get())) {
918+
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
919+
} else {
920+
if (LOG.isDebugEnabled()) {
921+
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
922+
}
923+
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
870924
}
871-
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
872925
}
873926
}
874927

hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java

Lines changed: 73 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -217,50 +217,55 @@ public void testCheckCommitLargeFileUpload() throws IOException {
217217
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
218218
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
219219

220-
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
220+
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
221221
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
222222
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
223223
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
224224

225225
// Test request with non zero commit offset
226226
ctx.setActiveStatusForTest(true);
227-
Mockito.when(fos.getPos()).thenReturn((long) 10);
227+
Mockito.when(fos.getPos()).thenReturn((long) 8);
228228
ctx.setNextOffsetForTest(10);
229229
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
230230
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
231231
// Do_SYNC state will be updated to FINISHED after data sync
232232
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
233233
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
234234

235+
// Test commit sequential writes
235236
status = ctx.checkCommitInternal(10, ch, 1, attr, false);
236-
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
237+
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
237238
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
238-
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
239+
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
239240

241+
// Test commit non-sequential writes
240242
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
241243
.getPendingCommitsForTest();
242-
Assert.assertTrue(commits.size() == 0);
243-
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
244+
Assert.assertTrue(commits.size() == 1);
245+
ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
244246
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
245-
Assert.assertTrue(commits.size() == 0);
247+
Assert.assertTrue(commits.size() == 1);
246248

247249
// Test request with zero commit offset
248-
commits.remove(new Long(11));
249-
// There is one pending write [5,10]
250+
commits.remove(new Long(10));
251+
// There is one pending write [10,15]
250252
ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
251-
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
253+
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
252254

253-
Mockito.when(fos.getPos()).thenReturn((long) 6);
254-
ret = ctx.checkCommitInternal(8, ch, 1, attr, false);
255+
ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
255256
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
256-
Assert.assertTrue(commits.size() == 1);
257-
long key = commits.firstKey();
258-
Assert.assertTrue(key == 8);
257+
Assert.assertTrue(commits.size() == 2);
259258

259+
// Empty pending writes. nextOffset=10, flushed pos=8
260+
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
261+
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
262+
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
263+
260264
// Empty pending writes
261-
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
265+
ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
262266
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
263267
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
268+
264269
}
265270

266271
@Test
@@ -286,6 +291,7 @@ public void testCheckCommitAixCompatMode() throws IOException {
286291
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
287292
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
288293
Mockito.when(fos.getPos()).thenReturn((long) 10);
294+
ctx.setNextOffsetForTest((long)10);
289295
status = ctx.checkCommitInternal(5, null, 1, attr, false);
290296
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
291297
}
@@ -317,7 +323,7 @@ public void testCheckCommitFromRead() throws IOException {
317323
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
318324
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
319325

320-
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
326+
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
321327
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
322328
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
323329
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
@@ -326,6 +332,7 @@ public void testCheckCommitFromRead() throws IOException {
326332
// Test request with non zero commit offset
327333
ctx.setActiveStatusForTest(true);
328334
Mockito.when(fos.getPos()).thenReturn((long) 10);
335+
ctx.setNextOffsetForTest((long)10);
329336
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
330337
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
331338
// Do_SYNC state will be updated to FINISHED after data sync
@@ -355,7 +362,7 @@ public void testCheckCommitFromRead() throws IOException {
355362
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
356363

357364
// Empty pending writes
358-
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
365+
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
359366
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
360367
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
361368
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
@@ -386,48 +393,51 @@ public void testCheckCommitFromReadLargeFileUpload() throws IOException {
386393
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
387394
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
388395

389-
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
396+
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
390397
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
391398
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
392399
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
393400
assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));
394401

395402
// Test request with non zero commit offset
396403
ctx.setActiveStatusForTest(true);
397-
Mockito.when(fos.getPos()).thenReturn((long) 10);
404+
Mockito.when(fos.getPos()).thenReturn((long) 6);
405+
ctx.setNextOffsetForTest((long)10);
398406
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
399407
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
400408
// Do_SYNC state will be updated to FINISHED after data sync
401409
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
402410
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
403411
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
404412

405-
status = ctx.checkCommitInternal(10, ch, 1, attr, true);
406-
assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
407-
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
408-
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
409-
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
410-
413+
// Test request with sequential writes
414+
status = ctx.checkCommitInternal(9, ch, 1, attr, true);
415+
assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
416+
ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
417+
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
418+
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));
419+
420+
// Test request with non-sequential writes
411421
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
412422
.getPendingCommitsForTest();
413423
assertTrue(commits.size() == 0);
414-
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
424+
ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
415425
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
416426
assertEquals(0, commits.size()); // commit triggered by read doesn't wait
417-
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11));
427+
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));
418428

419429
// Test request with zero commit offset
420-
// There is one pending write [5,10]
430+
// There is one pending write [10,15]
421431
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
422-
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
432+
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
423433
assertEquals(0, commits.size());
424-
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
434+
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
425435

426436
// Empty pending writes
427-
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
437+
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
428438
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
429-
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
430-
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
439+
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
440+
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
431441
}
432442

433443
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
@@ -629,4 +639,33 @@ securityHandler, new InetSocketAddress("localhost", config.getInt(
629639
}
630640
}
631641
}
642+
643+
@Test
644+
public void testCheckSequential() throws IOException {
645+
DFSClient dfsClient = Mockito.mock(DFSClient.class);
646+
Nfs3FileAttributes attr = new Nfs3FileAttributes();
647+
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
648+
Mockito.when(fos.getPos()).thenReturn((long) 0);
649+
NfsConfiguration config = new NfsConfiguration();
650+
651+
config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
652+
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
653+
new ShellBasedIdMapping(config), false, config);
654+
655+
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
656+
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
657+
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
658+
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
659+
ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
660+
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
661+
662+
assertTrue(!ctx.checkSequential(5, 4));
663+
assertTrue(ctx.checkSequential(9, 5));
664+
assertTrue(ctx.checkSequential(10, 5));
665+
assertTrue(ctx.checkSequential(14, 5));
666+
assertTrue(!ctx.checkSequential(15, 5));
667+
assertTrue(!ctx.checkSequential(20, 5));
668+
assertTrue(!ctx.checkSequential(25, 5));
669+
assertTrue(!ctx.checkSequential(999, 5));
670+
}
632671
}

0 commit comments

Comments
 (0)