From ab37783cc702e3517a60c5278406bbd1d38d1b25 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 30 Aug 2023 00:44:26 -0700 Subject: [PATCH 1/8] Datablock close on uploadAsync complete --- .../org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 4268dc3f918a1..14b29a8a8d898 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -345,6 +345,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, return null; } finally { IOUtils.close(blockUploadData); + blockToUpload.close(); } }); writeOperations.add(new WriteOperation(job, offset, bytesLength)); From 50e774206c4cf1e69ca9b523222c95ad6f6fc7ab Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 31 Aug 2023 05:44:19 -0700 Subject: [PATCH 2/8] test --- .../fs/azurebfs/AzureBlobFileSystem.java | 2 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 7 ++++- .../ITestAzureBlobFileSystemAppend.java | 31 +++++++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 426ad8ca1e191..8f7fbc570221e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -330,7 +330,7 @@ public FSDataOutputStream create(final Path f, try { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener); - OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite, + OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite, permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()), tracingContext); statIncrement(FILES_CREATED); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 49dff3360899e..c6ab9db1bd9af 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -707,7 +707,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) .withLease(lease) - .withBlockFactory(blockFactory) + .withBlockFactory(getBlockFactory()) .withBlockOutputActiveBlocks(blockOutputActiveBlocks) .withClient(client) .withPosition(position) @@ -1940,6 +1940,11 @@ void setClient(AbfsClient client) { this.client = client; } + @VisibleForTesting + protected DataBlocks.BlockFactory getBlockFactory() { + return blockFactory; + } + @VisibleForTesting void setNamespaceEnabled(Trilean isNamespaceEnabled){ this.isNamespaceEnabled = isNamespaceEnabled; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index dbe4b42a67df3..58386414a8f74 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -20,15 +20,21 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.util.Random; import org.junit.Test; +import org.mockito.Mockito; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.store.BlockUploadStatistics; +import org.apache.hadoop.fs.store.DataBlocks; /** * Test append operations. @@ -90,4 +96,29 @@ public void testTracingForAppend() throws IOException { fs.getFileSystemId(), FSOperationType.APPEND, false, 0)); fs.append(testPath, 10); } + + @Test + public void testCloseOfDataBlockOnAppendComplete() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(configuration)); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; + Mockito.doAnswer(answer -> { + DataBlocks.BlockFactory factory = Mockito.spy( + (DataBlocks.BlockFactory) answer.callRealMethod()); + Mockito.doAnswer(factoryCreate -> { + dataBlock[0] = Mockito.spy( + (DataBlocks.DataBlock) factoryCreate.callRealMethod()); + return dataBlock[0]; + }).when(factory).create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any( + BlockUploadStatistics.class)); + return factory; + }).when(store).getBlockFactory(); + OutputStream os = fs.create(new Path("/file")); + os.write(new byte[1]); + os.close(); + Mockito.verify(dataBlock[0], Mockito.times(1)).close(); + } } From 7770dfa2aa9a6e910ce9fd020033e5490e401a3d Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 31 Aug 2023 05:50:20 -0700 Subject: [PATCH 3/8] renamed some variables --- .../fs/azurebfs/ITestAzureBlobFileSystemAppend.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 58386414a8f74..283f0d46c4bf0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -105,12 +105,12 @@ public void testCloseOfDataBlockOnAppendComplete() throws Exception { AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); Mockito.doReturn(store).when(fs).getAbfsStore(); DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; - Mockito.doAnswer(answer -> { + Mockito.doAnswer(getBlobFactoryInvocation -> { DataBlocks.BlockFactory factory = Mockito.spy( - (DataBlocks.BlockFactory) answer.callRealMethod()); - Mockito.doAnswer(factoryCreate -> { + (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod()); + Mockito.doAnswer(factoryCreateInvocation -> { dataBlock[0] = Mockito.spy( - (DataBlocks.DataBlock) factoryCreate.callRealMethod()); + (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod()); return dataBlock[0]; }).when(factory).create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any( BlockUploadStatistics.class)); From 16455f884d3f32a34584166e93865ed36ccde040 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 31 Aug 2023 21:25:51 -0700 Subject: [PATCH 4/8] getBlockFactory to be package-protected --- .../org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index c6ab9db1bd9af..ff37ee5ee901e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1941,7 +1941,7 @@ void setClient(AbfsClient client) { } @VisibleForTesting - protected DataBlocks.BlockFactory getBlockFactory() { + DataBlocks.BlockFactory getBlockFactory() { return blockFactory; } From 1f8b7d8853f389be12a45ab745037abdbca30d4e Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Fri, 1 Sep 2023 05:25:21 -0700 Subject: [PATCH 5/8] made testCloseOfDataBlockOnAppendComplete exhaustive of datablock types --- .../ITestAzureBlobFileSystemAppend.java | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 283f0d46c4bf0..ffb81115d3d27 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -21,7 +21,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import org.junit.Test; import org.mockito.Mockito; @@ -36,6 +38,11 @@ import org.apache.hadoop.fs.store.BlockUploadStatistics; import org.apache.hadoop.fs.store.DataBlocks; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER; + /** * Test append operations. */ @@ -99,26 +106,35 @@ public void testTracingForAppend() throws IOException { @Test public void testCloseOfDataBlockOnAppendComplete() throws Exception { - Configuration configuration = new Configuration(getRawConfiguration()); - AzureBlobFileSystem fs = Mockito.spy( - (AzureBlobFileSystem) FileSystem.newInstance(configuration)); - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - Mockito.doReturn(store).when(fs).getAbfsStore(); - DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; - Mockito.doAnswer(getBlobFactoryInvocation -> { - DataBlocks.BlockFactory factory = Mockito.spy( - (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod()); - Mockito.doAnswer(factoryCreateInvocation -> { - dataBlock[0] = Mockito.spy( - (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod()); - return dataBlock[0]; - }).when(factory).create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any( - BlockUploadStatistics.class)); - return factory; - }).when(store).getBlockFactory(); - OutputStream os = fs.create(new Path("/file")); - os.write(new byte[1]); - os.close(); - Mockito.verify(dataBlock[0], Mockito.times(1)).close(); + Set blockBufferTypes = new HashSet<>(); + blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK); + blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER); + blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY); + for (String blockBufferType : blockBufferTypes) { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(DATA_BLOCKS_BUFFER, blockBufferType); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(configuration)); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; + Mockito.doAnswer(getBlobFactoryInvocation -> { + DataBlocks.BlockFactory factory = Mockito.spy( + (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod()); + Mockito.doAnswer(factoryCreateInvocation -> { + dataBlock[0] = Mockito.spy( + (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod()); + return dataBlock[0]; + }) + .when(factory) + .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any( + BlockUploadStatistics.class)); + return factory; + }).when(store).getBlockFactory(); + OutputStream os = fs.create(new Path("/file_" + blockBufferType)); + os.write(new byte[1]); + os.close(); + Mockito.verify(dataBlock[0], Mockito.times(1)).close(); + } } } From 0a87a03739dcf897d8337182a65fb4138f6a5905 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 13 Sep 2023 05:48:10 -0700 Subject: [PATCH 6/8] Datablock.getState() made public. --- .../src/main/java/org/apache/hadoop/fs/store/DataBlocks.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java index c70d0ee91e15e..265ece10f8149 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java @@ -375,7 +375,7 @@ protected final void verifyState(DestState expected) * * @return the current state. */ - final DestState getState() { + public final DestState getState() { return state; } From 96aca09f64171f674e824b29845e00190bbf9f06 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 13 Sep 2023 05:58:08 -0700 Subject: [PATCH 7/8] DestState made public --- .../src/main/java/org/apache/hadoop/fs/store/DataBlocks.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java index 265ece10f8149..0ae9ee6378b57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java @@ -329,7 +329,7 @@ public String getKeyToBufferDir() { */ public static abstract class DataBlock implements Closeable { - enum DestState {Writing, Upload, Closed} + public enum DestState {Writing, Upload, Closed} private volatile DestState state = Writing; private final long index; From d9da57b16ea00f4e79444a5109807ac6fef26e6c Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 13 Sep 2023 06:02:03 -0700 Subject: [PATCH 8/8] close with IoUtils; test refactors; --- .../azurebfs/services/AbfsOutputStream.java | 3 +-- .../ITestAzureBlobFileSystemAppend.java | 20 +++++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 14b29a8a8d898..89b0fe2040f6e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -344,8 +344,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, outputStreamStatistics.uploadSuccessful(bytesLength); return null; } finally { - IOUtils.close(blockUploadData); - blockToUpload.close(); + IOUtils.close(blockUploadData, blockToUpload); } }); writeOperations.add(new WriteOperation(job, offset, bytesLength)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index ffb81115d3d27..7d182f936b7bb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -25,6 +25,7 @@ import java.util.Random; import java.util.Set; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.mockito.Mockito; @@ -42,6 +43,8 @@ import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK; import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; /** * Test append operations. @@ -131,10 +134,19 @@ public void testCloseOfDataBlockOnAppendComplete() throws Exception { BlockUploadStatistics.class)); return factory; }).when(store).getBlockFactory(); - OutputStream os = fs.create(new Path("/file_" + blockBufferType)); - os.write(new byte[1]); - os.close(); - Mockito.verify(dataBlock[0], Mockito.times(1)).close(); + try (OutputStream os = fs.create( + new Path(getMethodName() + "_" + blockBufferType))) { + os.write(new byte[1]); + Assertions.assertThat(dataBlock[0].getState()) + .describedAs( + "On write of data in outputStream, state should become Writing") + .isEqualTo(Writing); + os.close(); + Mockito.verify(dataBlock[0], Mockito.times(1)).close(); + Assertions.assertThat(dataBlock[0].getState()) + .describedAs("On close of outputStream, state should become Closed") + .isEqualTo(Closed); + } } } }