From 8f679e374594576d84a7b1d4640f23f4621d6ffb Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Tue, 30 Jan 2024 13:50:49 -0800 Subject: [PATCH 01/17] [HDFS-17299] Adding rack failure tolerance when creating a new file --- .../org/apache/hadoop/hdfs/DataStreamer.java | 61 +++++--- .../hdfs/server/datanode/BlockReceiver.java | 5 +- .../datanode/fsdataset/FsDatasetSpi.java | 10 ++ .../fsdataset/impl/FsDatasetImpl.java | 19 ++- .../hdfs/TestDistributedFileSystem.java | 132 ++++++++++++++++++ .../server/datanode/SimulatedFSDataset.java | 5 + .../extdataset/ExternalDatasetImpl.java | 5 + 7 files changed, 215 insertions(+), 22 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index b313a8737fab0..64f9603660a28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1627,7 +1627,6 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes, StorageType[] nodeStorageTypes, String[] nodeStorageIDs) throws IOException { boolean success = false; - long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { if (!handleRestartingDatanode()) { return; @@ -1639,24 +1638,29 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes, } handleDatanodeReplacement(); - - // get a new generation stamp and an access token - final LocatedBlock lb = updateBlockForPipeline(); - newGS = lb.getBlock().getGenerationStamp(); - accessToken = lb.getBlockToken(); - - // set up the pipeline again with the remaining nodes - success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS, - isRecovery); + success = updateBlockAndCreateBlockOutputStream(isRecovery, false); failPacket4Testing(); errorState.checkRestartingNodeDeadline(nodes); + } // while + } + private boolean updateBlockAndCreateBlockOutputStream(boolean isRecovery, boolean setCurrentBlock) throws IOException { + final LocatedBlock lb = updateBlockForPipeline(); + long newGS = lb.getBlock().getGenerationStamp(); + accessToken = lb.getBlockToken(); + if(setCurrentBlock) { + block.setCurrentBlock(lb.getBlock()); + } + // set up the pipeline again with the remaining nodes + final boolean success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS, + isRecovery); if (success) { updatePipeline(newGS); } + return success; } /** @@ -1691,11 +1695,16 @@ boolean handleRestartingDatanode() { return true; } + + /** * Remove bad node from list of nodes if badNodeIndex was set. * @return true if it should continue. */ boolean handleBadDatanode() { + return this.handleBadDatanodeInternal(this.nodes, this.storageTypes, this.storageIDs); + } + private boolean handleBadDatanodeInternal(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) { final int badNodeIndex = errorState.getBadNodeIndex(); if (badNodeIndex >= 0) { if (nodes.length <= 1) { @@ -1823,13 +1832,18 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { 0L, false); if (!success) { - LOG.warn("Abandoning " + block); - dfsClient.namenode.abandonBlock(block.getCurrentBlock(), - stat.getFileId(), src, dfsClient.clientName); - block.setCurrentBlock(null); - final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; - LOG.warn("Excluding datanode " + badNode); - excludedNodes.put(badNode, badNode); + while (!success && handleBadDatanodeInternal(nodes, nextStorageTypes, nextStorageIDs) + && dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 + && this.nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { + LOG.info("Proceeding to create block {} after excluding bad datanode from pipeline", this); + success = updateBlockAndCreateBlockOutputStream(false, true); + nodes = this.nodes; + nextStorageTypes = this.storageTypes; + nextStorageIDs = this.storageIDs; + } + if(!success) { + handleBlockCreationFailure(); + } } } while (!success && --count >= 0); @@ -1839,6 +1853,19 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { return lb; } + private void handleBlockCreationFailure() throws IOException { + if(block != null) { + dfsClient.namenode.abandonBlock(block.getCurrentBlock(), + stat.getFileId(), src, dfsClient.clientName); + block.setCurrentBlock(null); + if(nodes != null && errorState.hasError()) { + final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; + LOG.warn("Excluding datanode " + badNode); + excludedNodes.put(badNode, badNode); + } + } + } + // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 38b03f8d6a24d..c014201e48d11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -218,7 +218,10 @@ class BlockReceiver implements Closeable { switch (stage) { case PIPELINE_SETUP_CREATE: replicaHandler = datanode.data.createRbw(storageType, storageId, - block, allowLazyPersist); + block, allowLazyPersist, newGs); + if(newGs != 0) { + block.setGenerationStamp(newGs); + } datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 4ab7e1be84523..8cf1daf841623 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -335,6 +335,16 @@ ReplicaHandler createTemporary(StorageType storageType, String storageId, ReplicaHandler createRbw(StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException; + /** + * Creates a RBW replica and returns the meta info of the replica + * + * @param b block + * @return the meta info of the replica which is being written to + * @throws IOException if an error occurs + */ + ReplicaHandler createRbw(StorageType storageType, String storageId, + ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException; + /** * Recovers a RBW replica and returns the meta info of the replica. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index b1526c9860e94..8cce616ae5c35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1583,19 +1583,30 @@ public Replica recoverClose(ExtendedBlock b, long newGS, } } + @Override // FsDatasetSpi + public ReplicaHandler createRbw( + StorageType storageType, String storageId, ExtendedBlock b, + boolean allowLazyPersist) throws IOException { + return createRbw(storageType, storageId, b, allowLazyPersist, 0L); + } + @Override // FsDatasetSpi public ReplicaHandler createRbw( StorageType storageType, String storageId, ExtendedBlock b, - boolean allowLazyPersist) throws IOException { + boolean allowLazyPersist, long newGS) throws IOException { long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, b.getBlockPoolId())) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { - throw new ReplicaAlreadyExistsException("Block " + b + - " already exists in state " + replicaInfo.getState() + - " and thus cannot be created."); + if(newGS != 0L) { + cleanupReplica(b.getBlockPoolId(), replicaInfo); + } else { + throw new ReplicaAlreadyExistsException("Block " + b + + " already exists in state " + replicaInfo.getState() + + " and thus cannot be created."); + } } // create a new block FsVolumeReference ref = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 8eb048c14235c..789863ec8daca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -108,6 +108,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -2651,5 +2653,135 @@ public void testNameNodeCreateSnapshotTrashRootOnStartup() } } + @Test + public void testSingleRackFailureDuringPipelineSetup_MinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testSingleRackFailureDuringPipelineSetup_MinReplicationImpossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 3); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with less DNs than required for min replication", threw); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetup_MinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 1); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill 2 DN, so only 1 racks stays with active DN + cluster.stopDataNode(0); + cluster.stopDataNode(1); + // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetup_MinReplicationImpossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill 2 DN, so only 1 racks stays with active DN + cluster.stopDataNode(0); + cluster.stopDataNode(1); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with less DNs than required for min replication", threw); + } + } + + @Test + public void testAllRackFailureDuringPipelineSetup() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // shutdown all DNs + cluster.shutdownDataNodes(); + // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks + // but fail because no DNs are present. + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with no DNs", threw); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 5421393c9e675..9c67dedf37f28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1204,6 +1204,11 @@ public synchronized ReplicaHandler createRbw( return createTemporary(storageType, storageId, b, false); } + @Override + public ReplicaHandler createRbw(StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException { + return createRbw(storageType, storageId, b, allowLazyPersist); + } + @Override // FsDatasetSpi public synchronized ReplicaHandler createTemporary(StorageType storageType, String storageId, ExtendedBlock b, boolean isTransfer) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 86d4319913301..ec24d42530a4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -153,6 +153,11 @@ public ReplicaHandler createRbw(StorageType storageType, String id, return new ReplicaHandler(new ExternalReplicaInPipeline(), null); } + @Override + public ReplicaHandler createRbw(StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException { + return createRbw(storageType, storageId, b, allowLazyPersist); + } + @Override public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { From adcfb92b8450ed915fd36e2344647b6cd5c4077f Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Tue, 30 Jan 2024 18:47:17 -0800 Subject: [PATCH 02/17] [HDFS-17299] Adding rack failure tolerance when creating a new file --- .../org/apache/hadoop/hdfs/DataStreamer.java | 36 +++++++++---------- .../hdfs/server/datanode/BlockReceiver.java | 2 +- .../datanode/fsdataset/FsDatasetSpi.java | 2 +- .../fsdataset/impl/FsDatasetImpl.java | 6 ++-- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 64f9603660a28..700d7f132fa32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1651,16 +1651,16 @@ private boolean updateBlockAndCreateBlockOutputStream(boolean isRecovery, boolea final LocatedBlock lb = updateBlockForPipeline(); long newGS = lb.getBlock().getGenerationStamp(); accessToken = lb.getBlockToken(); - if(setCurrentBlock) { + if (setCurrentBlock) { block.setCurrentBlock(lb.getBlock()); } // set up the pipeline again with the remaining nodes final boolean success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS, - isRecovery); + isRecovery); if (success) { updatePipeline(newGS); } - return success; + return success; } /** @@ -1695,21 +1695,19 @@ boolean handleRestartingDatanode() { return true; } - - /** * Remove bad node from list of nodes if badNodeIndex was set. * @return true if it should continue. */ boolean handleBadDatanode() { - return this.handleBadDatanodeInternal(this.nodes, this.storageTypes, this.storageIDs); + return handleBadDatanodeInternal(this.nodes, this.storageTypes, this.storageIDs); } private boolean handleBadDatanodeInternal(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) { final int badNodeIndex = errorState.getBadNodeIndex(); if (badNodeIndex >= 0) { if (nodes.length <= 1) { lastException.set(new IOException("All datanodes " - + Arrays.toString(nodes) + " are bad. Aborting...")); + + Arrays.toString(nodes) + " are bad. Aborting...")); streamerClosed = true; return false; } @@ -1832,16 +1830,16 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { 0L, false); if (!success) { - while (!success && handleBadDatanodeInternal(nodes, nextStorageTypes, nextStorageIDs) - && dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 - && this.nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { - LOG.info("Proceeding to create block {} after excluding bad datanode from pipeline", this); - success = updateBlockAndCreateBlockOutputStream(false, true); - nodes = this.nodes; - nextStorageTypes = this.storageTypes; - nextStorageIDs = this.storageIDs; + while (!success && handleBadDatanodeInternal(nodes, nextStorageTypes, nextStorageIDs) && + dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + this.nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { + LOG.info("Proceeding to create block {} after excluding bad datanode from pipeline", this); + success = updateBlockAndCreateBlockOutputStream(false, true); + nodes = this.nodes; + nextStorageTypes = this.storageTypes; + nextStorageIDs = this.storageIDs; } - if(!success) { + if (!success) { handleBlockCreationFailure(); } } @@ -1854,11 +1852,11 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { } private void handleBlockCreationFailure() throws IOException { - if(block != null) { + if (block != null) { dfsClient.namenode.abandonBlock(block.getCurrentBlock(), - stat.getFileId(), src, dfsClient.clientName); + stat.getFileId(), src, dfsClient.clientName); block.setCurrentBlock(null); - if(nodes != null && errorState.hasError()) { + if (nodes != null && errorState.hasError()) { final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index c014201e48d11..bf973f2822326 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -219,7 +219,7 @@ class BlockReceiver implements Closeable { case PIPELINE_SETUP_CREATE: replicaHandler = datanode.data.createRbw(storageType, storageId, block, allowLazyPersist, newGs); - if(newGs != 0) { + if (newGs != 0) { block.setGenerationStamp(newGs); } datanode.notifyNamenodeReceivingBlock( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 8cf1daf841623..a128ba193afd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -343,7 +343,7 @@ ReplicaHandler createRbw(StorageType storageType, String storageId, * @throws IOException if an error occurs */ ReplicaHandler createRbw(StorageType storageType, String storageId, - ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException; + ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8cce616ae5c35..636769145a8f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1585,8 +1585,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS, @Override // FsDatasetSpi public ReplicaHandler createRbw( - StorageType storageType, String storageId, ExtendedBlock b, - boolean allowLazyPersist) throws IOException { + StorageType storageType, String storageId, ExtendedBlock b, + boolean allowLazyPersist) throws IOException { return createRbw(storageType, storageId, b, allowLazyPersist, 0L); } @@ -1600,7 +1600,7 @@ public ReplicaHandler createRbw( ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { - if(newGS != 0L) { + if (newGS != 0L) { cleanupReplica(b.getBlockPoolId(), replicaInfo); } else { throw new ReplicaAlreadyExistsException("Block " + b + From adab84b1abc231c2b4d47d9747c4990c80dfe16b Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Wed, 31 Jan 2024 16:50:01 -0800 Subject: [PATCH 03/17] [HDFS-17299] Adding rack failure tolerance when creating a new file - Fixing test failures --- .../org/apache/hadoop/hdfs/DataStreamer.java | 29 ++-- .../hdfs/server/datanode/BlockReceiver.java | 2 +- .../hdfs/TestDistributedFileSystem.java | 142 +++++++++--------- 3 files changed, 94 insertions(+), 79 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 700d7f132fa32..77fb6b00aa1b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1647,7 +1647,8 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes, } // while } - private boolean updateBlockAndCreateBlockOutputStream(boolean isRecovery, boolean setCurrentBlock) throws IOException { + private boolean updateBlockAndCreateBlockOutputStream(boolean isRecovery, boolean setCurrentBlock) + throws IOException { final LocatedBlock lb = updateBlockForPipeline(); long newGS = lb.getBlock().getGenerationStamp(); accessToken = lb.getBlockToken(); @@ -1830,17 +1831,25 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { 0L, false); if (!success) { - while (!success && handleBadDatanodeInternal(nodes, nextStorageTypes, nextStorageIDs) && - dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && - this.nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { - LOG.info("Proceeding to create block {} after excluding bad datanode from pipeline", this); - success = updateBlockAndCreateBlockOutputStream(false, true); - nodes = this.nodes; - nextStorageTypes = this.storageTypes; - nextStorageIDs = this.storageIDs; + while (!success && dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + nodes.length - 1 >= dfsClient.dtpReplaceDatanodeOnFailureReplication && + handleBadDatanodeInternal(nodes, nextStorageTypes, nextStorageIDs)) { + LOG.info("Proceeding to create block {} after excluding bad datanode from pipeline", this); + success = updateBlockAndCreateBlockOutputStream(false, true); + nodes = this.nodes; + nextStorageTypes = this.storageTypes; + nextStorageIDs = this.storageIDs; } if (!success) { - handleBlockCreationFailure(); + LOG.warn("Abandoning " + block); + dfsClient.namenode.abandonBlock(block.getCurrentBlock(), stat.getFileId(), src, dfsClient.clientName); + block.setCurrentBlock(null); + final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; + LOG.warn("Excluding datanode " + badNode); + excludedNodes.put(badNode, badNode); + if (this.nodes != null) { + setPipeline(null, null, null); + } } } } while (!success && --count >= 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index bf973f2822326..1d34c773e6259 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -219,7 +219,7 @@ class BlockReceiver implements Closeable { case PIPELINE_SETUP_CREATE: replicaHandler = datanode.data.createRbw(storageType, storageId, block, allowLazyPersist, newGs); - if (newGs != 0) { + if (newGs != 0L) { block.setGenerationStamp(newGs); } datanode.notifyNamenodeReceivingBlock( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 789863ec8daca..55f578a4ca6a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -2654,7 +2654,7 @@ public void testNameNodeCreateSnapshotTrashRootOnStartup() } @Test - public void testSingleRackFailureDuringPipelineSetup_MinReplicationPossible() throws Exception { + public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { Configuration conf = getTestConfiguration(); conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, BlockPlacementPolicy.class); @@ -2662,32 +2662,33 @@ public void testSingleRackFailureDuringPipelineSetup_MinReplicationPossible() th HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, false); conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. - MIN_REPLICATION, 2); + MIN_REPLICATION, 2); // 3 racks & 3 nodes. 1 per rack - try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { - cluster.waitClusterUp(); - DistributedFileSystem fs = cluster.getFileSystem(); - // kill one DN, so only 2 racks stays with active DN - cluster.stopDataNode(0); - // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks. - DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); - } + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + cluster.shutdown(true); + } } @Test - public void testSingleRackFailureDuringPipelineSetup_MinReplicationImpossible() throws Exception { + public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible() throws Exception { Configuration conf = getTestConfiguration(); conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, - BlockPlacementPolicy.class); + BlockPlacementPolicy.class); conf.setBoolean( - HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, - false); + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. - MIN_REPLICATION, 3); + MIN_REPLICATION, 3); // 3 racks & 3 nodes. 1 per rack try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) - .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { cluster.waitClusterUp(); DistributedFileSystem fs = cluster.getFileSystem(); // kill one DN, so only 2 racks stays with active DN @@ -2698,50 +2699,53 @@ public void testSingleRackFailureDuringPipelineSetup_MinReplicationImpossible() } catch (IOException e) { // success threw = true; + } finally { + cluster.shutdown(true); } assertTrue("Failed to throw IOE when creating a file with less DNs than required for min replication", threw); } } @Test - public void testMultipleRackFailureDuringPipelineSetup_MinReplicationPossible() throws Exception { - Configuration conf = getTestConfiguration(); - conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, - BlockPlacementPolicy.class); - conf.setBoolean( - HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, - false); - conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. - MIN_REPLICATION, 1); - // 3 racks & 3 nodes. 1 per rack - try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) - .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { - cluster.waitClusterUp(); - DistributedFileSystem fs = cluster.getFileSystem(); - // kill 2 DN, so only 1 racks stays with active DN - cluster.stopDataNode(0); - cluster.stopDataNode(1); - // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks. - DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); - } + public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 1); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill 2 DN, so only 1 racks stays with active DN + cluster.stopDataNode(0); + cluster.stopDataNode(1); + // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + cluster.shutdown(true); + } } @Test - public void testMultipleRackFailureDuringPipelineSetup_MinReplicationImpossible() throws Exception { + public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible() throws Exception { Configuration conf = getTestConfiguration(); conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, - BlockPlacementPolicy.class); + BlockPlacementPolicy.class); conf.setBoolean( - HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, - false); + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. - MIN_REPLICATION, 2); + MIN_REPLICATION, 2); // 3 racks & 3 nodes. 1 per rack try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) - .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { cluster.waitClusterUp(); DistributedFileSystem fs = cluster.getFileSystem(); - // kill 2 DN, so only 1 racks stays with active DN + // kill 2 DN, so only 1 rack stays with active DN cluster.stopDataNode(0); cluster.stopDataNode(1); boolean threw = false; @@ -2751,37 +2755,39 @@ public void testMultipleRackFailureDuringPipelineSetup_MinReplicationImpossible( // success threw = true; } + cluster.shutdown(true); assertTrue("Failed to throw IOE when creating a file with less DNs than required for min replication", threw); } } @Test public void testAllRackFailureDuringPipelineSetup() throws Exception { - Configuration conf = getTestConfiguration(); - conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, - BlockPlacementPolicy.class); - conf.setBoolean( - HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, - false); - // 3 racks & 3 nodes. 1 per rack - try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) - .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { - cluster.waitClusterUp(); - DistributedFileSystem fs = cluster.getFileSystem(); - // shutdown all DNs - cluster.shutdownDataNodes(); - // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks - // but fail because no DNs are present. - boolean threw = false; - try { - DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); - } catch (IOException e) { - // success - threw = true; - } - assertTrue("Failed to throw IOE when creating a file with no DNs", threw); + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // shutdown all DNs + cluster.shutdownDataNodes(); + // create a file with replication 3, for rack fault tolerant BPP, it should allocate nodes in all 3 racks + // but fail because no DNs are present. + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } finally { + cluster.shutdown(true); } + assertTrue("Failed to throw IOE when creating a file with no DNs", threw); + } } - -} +} \ No newline at end of file From 5778b0cdd0c4d72fcec4999e50a044f8f0f8259e Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Wed, 31 Jan 2024 18:24:58 -0800 Subject: [PATCH 04/17] [HDFS-17299] Adding rack failure tolerance when creating a new file - Fixing test failures --- .../java/org/apache/hadoop/hdfs/DataStreamer.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 77fb6b00aa1b5..5061b59e744e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1860,19 +1860,6 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { return lb; } - private void handleBlockCreationFailure() throws IOException { - if (block != null) { - dfsClient.namenode.abandonBlock(block.getCurrentBlock(), - stat.getFileId(), src, dfsClient.clientName); - block.setCurrentBlock(null); - if (nodes != null && errorState.hasError()) { - final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; - LOG.warn("Excluding datanode " + badNode); - excludedNodes.put(badNode, badNode); - } - } - } - // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // From 67d90ef3126a020b0aef640c19428811b80dda4d Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Wed, 31 Jan 2024 20:55:55 -0800 Subject: [PATCH 05/17] [HDFS-17299] Adding rack failure tolerance when creating a new file - Fixing test failures --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 5061b59e744e6..b85c8e3aa0fa7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1860,6 +1860,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { return lb; } + // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // From a841d2f56cfc2e87346a7c51d7d3fb764265a5c8 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Thu, 1 Feb 2024 09:47:14 -0800 Subject: [PATCH 06/17] [HDFS-17299] Adding rack failure tolerance when creating a new file - Dummy Commit --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index b85c8e3aa0fa7..5061b59e744e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1860,7 +1860,6 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { return lb; } - // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // From 94df2df7c4336d352956d4558789383f99f8e704 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Thu, 1 Feb 2024 14:55:28 -0800 Subject: [PATCH 07/17] [HDFS-17299] Adding rack failure tolerance when creating a new file - Refactoring --- .../org/apache/hadoop/hdfs/DataStreamer.java | 106 +++++++++--------- .../hadoop/hdfs/StripedDataStreamer.java | 12 +- 2 files changed, 57 insertions(+), 61 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 5061b59e744e6..2fc44b8376141 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -643,11 +643,11 @@ void setAccessToken(Token t) { this.accessToken = t; } - private void setPipeline(LocatedBlock lb) { + protected void setPipeline(LocatedBlock lb) { setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } - private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, + protected void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) { synchronized (nodesLock) { this.nodes = nodes; @@ -748,7 +748,7 @@ public void run() { if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { LOG.debug("Allocating new block: {}", this); - setPipeline(nextBlockOutputStream()); + setupPipelineForCreate(); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { LOG.debug("Append to block {}", block); @@ -966,8 +966,8 @@ void waitForAckedSeqno(long seqno) throws IOException { long duration = Time.monotonicNowNanos() - begin; if (TimeUnit.NANOSECONDS.toMillis(duration) > writeTimeout) { LOG.error("No ack received, took {}ms (threshold={}ms). " - + "File being written: {}, block: {}, " - + "Write pipeline datanodes: {}.", + + "File being written: {}, block: {}, " + + "Write pipeline datanodes: {}.", TimeUnit.NANOSECONDS.toMillis(duration), writeTimeout, src, block, nodes); throw new InterruptedIOException("No ack received after " + TimeUnit.NANOSECONDS.toSeconds(duration) + "s and a timeout of " + @@ -1525,8 +1525,7 @@ private void addDatanode2ExistingPipeline() throws IOException { // MIN_REPLICATION is set to 0 or less than zero, an exception will be // thrown if a replacement could not be found. - if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && nodes.length - >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { + if (checkMinReplicationSatisfied()) { DFSClient.LOG.warn( "Failed to find a new datanode to add to the write pipeline," + " continue to write to the pipeline with " + nodes.length @@ -1608,7 +1607,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, * This happens when a file is appended or data streaming fails * It keeps on trying until a pipeline is setup */ - private void setupPipelineForAppendOrRecovery() throws IOException { + private boolean setupPipelineForAppendOrRecovery() throws IOException { // Check number of datanodes. Note that if there is no healthy datanode, // this must be internal error because we mark external error in striped // outputstream only when all the streamers are in the DATA_STREAMING stage @@ -1618,52 +1617,62 @@ private void setupPipelineForAppendOrRecovery() throws IOException { LOG.warn(msg); lastException.set(new IOException(msg)); streamerClosed = true; - return; + return false; } - setupPipelineInternal(nodes, storageTypes, storageIDs); + return setupPipelineInternal(nodes, storageTypes, storageIDs); } - protected void setupPipelineInternal(DatanodeInfo[] datanodes, + protected boolean setupPipelineInternal(DatanodeInfo[] datanodes, StorageType[] nodeStorageTypes, String[] nodeStorageIDs) throws IOException { boolean success = false; + long newGS = 0L; + boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage; while (!success && !streamerClosed && dfsClient.clientRunning) { if (!handleRestartingDatanode()) { - return; + return false; } - final boolean isRecovery = errorState.hasInternalError(); + final boolean isRecovery = errorState.hasInternalError() && !isCreateStage; if (!handleBadDatanode()) { - return; + return false; } handleDatanodeReplacement(); - success = updateBlockAndCreateBlockOutputStream(isRecovery, false); + + if (isCreateStage && !checkMinReplicationSatisfied()) { + return false; + } + + // get a new generation stamp and an access token + final LocatedBlock lb = updateBlockForPipeline(); + newGS = lb.getBlock().getGenerationStamp(); + accessToken = lb.getBlockToken(); + + if (isCreateStage) { + block.setCurrentBlock(lb.getBlock()); + } + + // set up the pipeline again with the remaining nodes + success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS, + isRecovery); failPacket4Testing(); errorState.checkRestartingNodeDeadline(nodes); - } // while - } - private boolean updateBlockAndCreateBlockOutputStream(boolean isRecovery, boolean setCurrentBlock) - throws IOException { - final LocatedBlock lb = updateBlockForPipeline(); - long newGS = lb.getBlock().getGenerationStamp(); - accessToken = lb.getBlockToken(); - if (setCurrentBlock) { - block.setCurrentBlock(lb.getBlock()); - } - // set up the pipeline again with the remaining nodes - final boolean success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS, - isRecovery); if (success) { updatePipeline(newGS); } return success; } + private boolean checkMinReplicationSatisfied() { + return dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication; + } + /** * Sleep if a node is restarting. * This process is repeated until the deadline or the node starts back up. @@ -1701,14 +1710,11 @@ boolean handleRestartingDatanode() { * @return true if it should continue. */ boolean handleBadDatanode() { - return handleBadDatanodeInternal(this.nodes, this.storageTypes, this.storageIDs); - } - private boolean handleBadDatanodeInternal(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) { final int badNodeIndex = errorState.getBadNodeIndex(); if (badNodeIndex >= 0) { if (nodes.length <= 1) { lastException.set(new IOException("All datanodes " - + Arrays.toString(nodes) + " are bad. Aborting...")); + + Arrays.toString(nodes) + " are bad. Aborting...")); streamerClosed = true; return false; } @@ -1803,7 +1809,7 @@ DatanodeInfo[] getExcludedNodes() { * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ - protected LocatedBlock nextBlockOutputStream() throws IOException { + protected LocatedBlock setupPipelineForCreate() throws IOException { LocatedBlock lb; DatanodeInfo[] nodes; StorageType[] nextStorageTypes; @@ -1825,32 +1831,20 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { nodes = lb.getLocations(); nextStorageTypes = lb.getStorageTypes(); nextStorageIDs = lb.getStorageIDs(); - + setPipeline(lb); // Connect to first DataNode in the list. success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs, - 0L, false); + 0L, false) || setupPipelineForAppendOrRecovery(); if (!success) { - while (!success && dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && - nodes.length - 1 >= dfsClient.dtpReplaceDatanodeOnFailureReplication && - handleBadDatanodeInternal(nodes, nextStorageTypes, nextStorageIDs)) { - LOG.info("Proceeding to create block {} after excluding bad datanode from pipeline", this); - success = updateBlockAndCreateBlockOutputStream(false, true); - nodes = this.nodes; - nextStorageTypes = this.storageTypes; - nextStorageIDs = this.storageIDs; - } - if (!success) { - LOG.warn("Abandoning " + block); - dfsClient.namenode.abandonBlock(block.getCurrentBlock(), stat.getFileId(), src, dfsClient.clientName); - block.setCurrentBlock(null); - final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; - LOG.warn("Excluding datanode " + badNode); - excludedNodes.put(badNode, badNode); - if (this.nodes != null) { - setPipeline(null, null, null); - } - } + LOG.warn("Abandoning " + block); + dfsClient.namenode.abandonBlock(block.getCurrentBlock(), + stat.getFileId(), src, dfsClient.clientName); + block.setCurrentBlock(null); + final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; + LOG.warn("Excluding datanode " + badNode); + excludedNodes.put(badNode, badNode); + setPipeline(null, null, null); } } while (!success && --count >= 0); @@ -2280,4 +2274,4 @@ public String toString() { return extendedBlock == null ? "block==null" : "" + extendedBlock.getLocalBlock(); } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 79b4bbadce9c1..65639e646e98d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -90,7 +90,7 @@ private LocatedBlock getFollowingBlock() throws IOException { } @Override - protected LocatedBlock nextBlockOutputStream() throws IOException { + protected LocatedBlock setupPipelineForCreate() throws IOException { boolean success; LocatedBlock lb = getFollowingBlock(); block.setCurrentBlock(lb.getBlock()); @@ -101,7 +101,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { DatanodeInfo[] nodes = lb.getLocations(); StorageType[] storageTypes = lb.getStorageTypes(); String[] storageIDs = lb.getStorageIDs(); - + setPipeline(lb); // Connect to the DataNode. If fail the internal error state will be set. success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L, false); @@ -111,6 +111,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); + setPipeline(null, null, null); throw new IOException("Unable to create new block." + this); } return lb; @@ -122,18 +123,18 @@ LocatedBlock peekFollowingBlock() { } @Override - protected void setupPipelineInternal(DatanodeInfo[] nodes, + protected boolean setupPipelineInternal(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, String[] nodeStorageIDs) throws IOException { boolean success = false; while (!success && !streamerClosed() && dfsClient.clientRunning) { if (!handleRestartingDatanode()) { - return; + return false; } if (!handleBadDatanode()) { // for striped streamer if it is datanode error then close the stream // and return. no need to replace datanode - return; + return false; } // get a new generation stamp and an access token @@ -179,6 +180,7 @@ assert getErrorState().hasExternalError() setStreamerAsClosed(); } } // while + return success; } void setExternalError() { From 30284f9c79814ab4bcb8616214f0e78db098821b Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Fri, 2 Feb 2024 18:42:30 -0800 Subject: [PATCH 08/17] [HDFS-17299] Adding rack failure tolerance when creating a new file - Refactoring --- .../org/apache/hadoop/hdfs/DataStreamer.java | 23 +++++++++++-------- .../fsdataset/impl/FsDatasetImpl.java | 3 +++ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 2fc44b8376141..1ca971b28fbf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1525,7 +1525,8 @@ private void addDatanode2ExistingPipeline() throws IOException { // MIN_REPLICATION is set to 0 or less than zero, an exception will be // thrown if a replacement could not be found. - if (checkMinReplicationSatisfied()) { + if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { DFSClient.LOG.warn( "Failed to find a new datanode to add to the write pipeline," + " continue to write to the pipeline with " + nodes.length @@ -1606,6 +1607,9 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, * it can be written to. * This happens when a file is appended or data streaming fails * It keeps on trying until a pipeline is setup + * + * Returns boolean whether pipeline was setup successfully or not. + * This boolean is used upstream on whether to continue creating pipeline or throw exception */ private boolean setupPipelineForAppendOrRecovery() throws IOException { // Check number of datanodes. Note that if there is no healthy datanode, @@ -1634,16 +1638,20 @@ protected boolean setupPipelineInternal(DatanodeInfo[] datanodes, } final boolean isRecovery = errorState.hasInternalError() && !isCreateStage; - if (!handleBadDatanode()) { + + // During create stage, if we remove a node (nodes.length - 1) + // min replication should still be satisfied. + if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + nodes.length - 1 >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) { return false; } - handleDatanodeReplacement(); - - if (isCreateStage && !checkMinReplicationSatisfied()) { + if (!handleBadDatanode()) { return false; } + handleDatanodeReplacement(); + // get a new generation stamp and an access token final LocatedBlock lb = updateBlockForPipeline(); newGS = lb.getBlock().getGenerationStamp(); @@ -1668,11 +1676,6 @@ protected boolean setupPipelineInternal(DatanodeInfo[] datanodes, return success; } - private boolean checkMinReplicationSatisfied() { - return dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && - nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication; - } - /** * Sleep if a node is restarting. * This process is repeated until the deadline or the node starts back up. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 636769145a8f1..5be095118fc38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1600,6 +1600,9 @@ public ReplicaHandler createRbw( ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { + // In case of retries with same blockPoolId + blockId as before + // with updated GS, cleanup the old replica to avoid + // any multiple copies with same blockPoolId + blockId if (newGS != 0L) { cleanupReplica(b.getBlockPoolId(), replicaInfo); } else { From 793cb89bc8de3bc7a411f8e642641aaa58b6000a Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Wed, 7 Feb 2024 14:43:57 -0800 Subject: [PATCH 09/17] [HDFS-17299] Adding rack failure tolerance when creating a new file - Refactoring --- .../org/apache/hadoop/hdfs/DataStreamer.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 1ca971b28fbf1..624dd21aa3b07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -414,6 +414,9 @@ synchronized void markFirstNodeIfNotMarked() { } synchronized void adjustState4RestartingNode() { + if (restartingNodeIndex == -1) { + return; + } // Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { // If the error came from a node further away than the restarting @@ -1639,19 +1642,19 @@ protected boolean setupPipelineInternal(DatanodeInfo[] datanodes, final boolean isRecovery = errorState.hasInternalError() && !isCreateStage; - // During create stage, if we remove a node (nodes.length - 1) - // min replication should still be satisfied. - if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && - nodes.length - 1 >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) { - return false; - } - if (!handleBadDatanode()) { return false; } handleDatanodeReplacement(); + // During create stage, if we remove a node (nodes.length - 1) + // min replication should still be satisfied. + if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) { + return false; + } + // get a new generation stamp and an access token final LocatedBlock lb = updateBlockForPipeline(); newGS = lb.getBlock().getGenerationStamp(); From fd2b388af316f4b2976d109313ce7b16a3d42d34 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Tue, 13 Feb 2024 14:02:20 -0800 Subject: [PATCH 10/17] Fixing Tests --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 624dd21aa3b07..f737a106f0506 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -684,6 +684,7 @@ protected void endBlock() { closeStream(); setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; + lastAckedSeqno = -1; } private boolean shouldStop() { From 6f305edb5940744d650239d53397bc5b5f93c0c4 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Tue, 13 Feb 2024 15:19:27 -0800 Subject: [PATCH 11/17] Fixing Tests --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index f737a106f0506..8200554427943 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1827,7 +1827,7 @@ protected LocatedBlock setupPipelineForCreate() throws IOException { do { errorState.resetInternalError(); lastException.clear(); - + streamerClosed = false; DatanodeInfo[] excluded = getExcludedNodes(); lb = locateFollowingBlock( excluded.length > 0 ? excluded : null, oldBlock); From 3237a97060fdd3cab64911c8acd7a2dfba70a510 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Wed, 14 Feb 2024 18:01:28 -0800 Subject: [PATCH 12/17] Fixing Tests --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 1 - .../org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8200554427943..ad3a2a013882c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -684,7 +684,6 @@ protected void endBlock() { closeStream(); setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; - lastAckedSeqno = -1; } private boolean shouldStop() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java index 59cc154071668..95e4e4a4a834b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java @@ -89,6 +89,9 @@ public void testExcludedNodesForgiveness() throws IOException { conf.setLong( HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, 2500); + // Set min replication for blocks to be written as 1. + conf.setInt( HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, + 1); // We'll be using a 512 bytes block size just for tests // so making sure the checksum bytes too match it. conf.setInt("io.bytes.per.checksum", 512); From 54a6fc2882d293ac0a53b922ac53301c626fe25e Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Wed, 14 Feb 2024 18:38:59 -0800 Subject: [PATCH 13/17] Fix formatting --- .../org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java index 95e4e4a4a834b..458a1de4b6eab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java @@ -90,7 +90,8 @@ public void testExcludedNodesForgiveness() throws IOException { HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, 2500); // Set min replication for blocks to be written as 1. - conf.setInt( HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, + conf.setInt( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, 1); // We'll be using a 512 bytes block size just for tests // so making sure the checksum bytes too match it. From 07e2d6241b633a34493a99416fdc8782155d1ad7 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Fri, 16 Feb 2024 10:54:58 -0800 Subject: [PATCH 14/17] Fixing Tests --- .../java/org/apache/hadoop/hdfs/DataStreamer.java | 15 +++++++-------- .../hadoop/hdfs/TestDFSClientExcludedNodes.java | 4 ---- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index ad3a2a013882c..a5089a8070d23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1642,19 +1642,19 @@ protected boolean setupPipelineInternal(DatanodeInfo[] datanodes, final boolean isRecovery = errorState.hasInternalError() && !isCreateStage; - if (!handleBadDatanode()) { - return false; - } - - handleDatanodeReplacement(); - // During create stage, if we remove a node (nodes.length - 1) // min replication should still be satisfied. if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && - nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) { + nodes.length - 1 >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) { return false; } + if (!handleBadDatanode()) { + return false; + } + + handleDatanodeReplacement(); + // get a new generation stamp and an access token final LocatedBlock lb = updateBlockForPipeline(); newGS = lb.getBlock().getGenerationStamp(); @@ -1826,7 +1826,6 @@ protected LocatedBlock setupPipelineForCreate() throws IOException { do { errorState.resetInternalError(); lastException.clear(); - streamerClosed = false; DatanodeInfo[] excluded = getExcludedNodes(); lb = locateFollowingBlock( excluded.length > 0 ? excluded : null, oldBlock); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java index 458a1de4b6eab..59cc154071668 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java @@ -89,10 +89,6 @@ public void testExcludedNodesForgiveness() throws IOException { conf.setLong( HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, 2500); - // Set min replication for blocks to be written as 1. - conf.setInt( - HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, - 1); // We'll be using a 512 bytes block size just for tests // so making sure the checksum bytes too match it. conf.setInt("io.bytes.per.checksum", 512); From 1c100fa087340006515f2e6e7cc6ea4e438edfb9 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Fri, 16 Feb 2024 10:59:03 -0800 Subject: [PATCH 15/17] Fixing Tests --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index a5089a8070d23..273ff348809a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -414,9 +414,6 @@ synchronized void markFirstNodeIfNotMarked() { } synchronized void adjustState4RestartingNode() { - if (restartingNodeIndex == -1) { - return; - } // Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { // If the error came from a node further away than the restarting From d270e027677d9733b20bfd87d607d0708b5a616b Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Fri, 16 Feb 2024 12:59:25 -0800 Subject: [PATCH 16/17] Fixing Tests --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 273ff348809a4..c4768eeb06047 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1871,7 +1871,6 @@ boolean createBlockOutputStream(DatanodeInfo[] nodes, if (LOG.isDebugEnabled()) { LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this); } - // persist blocks on namenode on next flush persistBlocks.set(true); From cbb3abac2cd1d17b6a9bd5ee4834d77382db2374 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Mon, 19 Feb 2024 20:14:45 -0800 Subject: [PATCH 17/17] Fixing Tests --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index c4768eeb06047..a1724f58159d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1873,7 +1873,6 @@ boolean createBlockOutputStream(DatanodeInfo[] nodes, } // persist blocks on namenode on next flush persistBlocks.set(true); - int refetchEncryptionKey = 1; while (true) { boolean result = false;