diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java index 6f2b5e22ac972..36d4fe2a9f1ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.Block; import java.util.List; @@ -35,7 +36,7 @@ @InterfaceStability.Unstable public abstract class Event { public static enum EventType { - CREATE, CLOSE, APPEND, RENAME, METADATA, UNLINK, TRUNCATE + CREATE, CLOSE, APPEND, RENAME, METADATA, UNLINK, TRUNCATE, ADD_BLOCK } private EventType eventType; @@ -530,6 +531,77 @@ public String toString() { } + @InterfaceAudience.Public + public static class AddBlockEvent extends Event { + private String path; + private String blockPoolId; + private Block lastBlock; + private Block penultimateBlock; + + public static class Builder { + private String path; + private String blockPoolId; + private Block lastBlock; + private Block penultimateBlock; + + public Builder setPath(String path) { + this.path = path; + return this; + } + + public Builder setBlockPoolId(String blockPoolId) { + this.blockPoolId = blockPoolId; + return this; + } + + public Builder setLastBlock(Block lastBlock) { + this.lastBlock = lastBlock; + return this; + } + + public Builder setPenultimateBlock(Block penultimateBlock) { + this.penultimateBlock = penultimateBlock; + return this; + } + + public AddBlockEvent build() { + return new AddBlockEvent(this); + } + } + + public AddBlockEvent(Builder builder) { + super(EventType.ADD_BLOCK); + this.path = builder.path; + this.blockPoolId = builder.blockPoolId; + this.penultimateBlock = builder.penultimateBlock; + this.lastBlock = builder.lastBlock; + } + + public String getPath() { + return path; + } + + public String getBlockPoolId() { + return blockPoolId; + } + + public Block getLastBlock() { + return lastBlock; + } + + public Block getPenultimateBlock() { + return penultimateBlock; + } + + @Override + @InterfaceStability.Unstable + public String toString() { + return "AddBlockEvent [path=" + path + ", poolId=" + blockPoolId + + ", penultimateBlock=" + penultimateBlock + ", lastBlock=" + lastBlock + "]"; + } + + } + /** * Sent when an existing file is opened for append. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 6c194b4022130..94ce455b9a22f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -124,6 +124,32 @@ public interface ClientProtocol { LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException; + /////////////////////////////////////// + // Block contents + /////////////////////////////////////// + /** + * Get locations of the block of the specified Extended Block + * DataNode locations for each block are sorted by + * the proximity to the client. (TODO: rg - do we really need this here?) + *

+ * Return {@link LocatedBlocks} which contains + * file length, blocks and their locations. + * DataNode locations for each block are sorted by + * the distance to the client's address. + *

+ * The client will then have to contact + * one of the indicated DataNodes to obtain the actual data. + * + * @param block the block you would like to grab + * @return block with locations + * + * @throws org.apache.hadoop.security.AccessControlException If access is + * denied + * @throws IOException If an I/O error occurred + */ + @Idempotent + LocatedBlock getBlockLocation(ExtendedBlock block) throws IOException; + /** * Get server default values for a number of configuration params. * @return a set of server default configuration values diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 005f26e4d63e5..d1039d585bdb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -54,6 +54,14 @@ message GetBlockLocationsResponseProto { optional LocatedBlocksProto locations = 1; } +message GetBlockLocationRequestProto { + required ExtendedBlockProto block = 1; +} + +message GetBlockLocationResponseProto { + optional LocatedBlockProto block = 1; +} + message GetServerDefaultsRequestProto { // No parameters } @@ -722,6 +730,8 @@ message GetEditsFromTxidResponseProto { service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) returns(GetBlockLocationsResponseProto); + rpc getBlockLocation(GetBlockLocationRequestProto) + returns(GetBlockLocationResponseProto); rpc getServerDefaults(GetServerDefaultsRequestProto) returns(GetServerDefaultsResponseProto); rpc create(CreateRequestProto)returns(CreateResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto index 5339902958272..2f5e736942bc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto @@ -42,6 +42,7 @@ enum EventType { EVENT_METADATA = 0x4; EVENT_UNLINK = 0x5; EVENT_TRUNCATE = 0x6; + EVENT_ADDBLOCK = 0x7; } message EventProto { @@ -105,6 +106,14 @@ message RenameEventProto { required int64 timestamp = 3; } +message AddBlockEventProto { + required string srcPath = 1; + required string blockPoolId = 2; + optional BlockProto penultimateBlock = 3; + optional BlockProto lastblock = 4; + +} + message MetadataUpdateEventProto { required string path = 1; required MetadataUpdateType type = 2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 4e1d6978cad7a..cf0a9100a91d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.reflect.Proxy; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -47,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.TreeSet; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; @@ -157,6 +157,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.DataOutputBuffer; @@ -167,7 +168,6 @@ import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -3223,4 +3223,90 @@ TraceScope getSrcDstTraceScope(String description, String src, String dst) { } return scope; } + + void copyBlock(LocatedBlock lblock, OutputStream fos) throws Exception { + int failures = 0; + InetSocketAddress targetAddr = null; + TreeSet deadNodes = new TreeSet(); + BlockReader blockReader = null; + ExtendedBlock block = lblock.getBlock(); + + while (blockReader == null) { + DatanodeInfo chosenNode; + + try { + chosenNode = bestNode(lblock.getLocations(), deadNodes); + targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr()); + } catch (IOException ie) { + if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) { + throw new IOException("Could not obtain block " + lblock, ie); + } + LOG.info("Could not obtain block from any node: " + ie); + try { + Thread.sleep(10000); + } catch (InterruptedException iex) { + } + deadNodes.clear(); + failures++; + continue; + } + try { + String file = BlockReaderFactory.getFileName(targetAddr, + block.getBlockPoolId(), block.getBlockId()); + blockReader = new BlockReaderFactory(dfsClientConf). + setFileName(file). + setBlock(block). + setBlockToken(lblock.getBlockToken()). + setStartOffset(0). + setLength(-1). + setVerifyChecksum(true). + setClientName("fsck"). + setDatanodeInfo(chosenNode). + setInetSocketAddress(targetAddr). + setCachingStrategy(CachingStrategy.newDropBehind()). + setClientCacheContext(getClientContext()). + setConfiguration(conf). + setRemotePeerFactory(this). + build(); + } catch (IOException ex) { + // Put chosen node into dead list, continue + LOG.info("Failed to connect to " + targetAddr + ":" + ex); + deadNodes.add(chosenNode); + } + } + byte[] buf = new byte[1024]; + int cnt = 0; + boolean success = true; + long bytesRead = 0; + try { + while ((cnt = blockReader.read(buf, 0, buf.length)) > 0) { + fos.write(buf, 0, cnt); + bytesRead += cnt; + } + if ( bytesRead != block.getNumBytes() ) { + throw new IOException("Recorded block size is " + block.getNumBytes() + + ", but datanode returned " +bytesRead+" bytes"); + } + } catch (Exception e) { + LOG.error("Error reading block", e); + success = false; + } finally { + blockReader.close(); + } + if (!success) { + throw new Exception("Could not copy block data for " + lblock.getBlock()); + } + } + + private static DatanodeInfo bestNode(DatanodeInfo[] nodes, TreeSet deadNodes) + throws IOException { + if ((nodes == null) || (nodes.length - deadNodes.size() < 1)) { + throw new IOException("No live nodes contain current block"); + } + DatanodeInfo chosenNode; + do { + chosenNode = nodes[ThreadLocalRandom.current().nextInt(nodes.length)]; + } while (deadNodes.contains(chosenNode)); + return chosenNode; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 02b20d6280476..78bc108a96562 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclResponseProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclRequestProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; @@ -227,6 +228,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import static org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.*; + /** * This class is used on the server side. Calls come across the wire for the * for protocol {@link ClientNamenodeProtocolPB}. @@ -378,6 +381,22 @@ public GetBlockLocationsResponseProto getBlockLocations( } } + @Override + public GetBlockLocationResponseProto getBlockLocation( + RpcController controller, GetBlockLocationRequestProto request) + throws ServiceException { + try { + LocatedBlock block = server.getBlockLocation(PBHelper.convert(request.getBlock())); + GetBlockLocationResponseProto.Builder builder = GetBlockLocationResponseProto.newBuilder(); + if (block != null) { + builder.setBlock(PBHelper.convert(block)).build(); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public GetServerDefaultsResponseProto getServerDefaults( RpcController controller, GetServerDefaultsRequestProto req) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 7e57b9731ef14..d1b474df281b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclRequestProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveRequestProto; @@ -98,6 +99,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; @@ -262,6 +264,22 @@ public LocatedBlocks getBlockLocations(String src, long offset, long length) } } + @Override + public LocatedBlock getBlockLocation(ExtendedBlock block) throws IOException { + GetBlockLocationRequestProto req = GetBlockLocationRequestProto + .newBuilder() + .setBlock(PBHelper.convert(block)) + .build(); + try { + ClientNamenodeProtocolProtos.GetBlockLocationResponseProto + resp = rpcProxy.getBlockLocation(null, req); + return resp != null ? + PBHelper.convert(resp.getBlock()) : null; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public FsServerDefaults getServerDefaults() throws IOException { GetServerDefaultsRequestProto req = VOID_GET_SERVER_DEFAULT_REQUEST; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index a2522622879fe..e5bd863436c89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -2692,6 +2692,14 @@ public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws events.add(new Event.TruncateEvent(truncate.getPath(), truncate.getFileSize(), truncate.getTimestamp())); break; + case EVENT_ADDBLOCK: + InotifyProtos.AddBlockEventProto addBlock = + InotifyProtos.AddBlockEventProto.parseFrom(p.getContents()); + events.add(new Event.AddBlockEvent.Builder().setPath(addBlock.getSrcPath()) + .setBlockPoolId(addBlock.getBlockPoolId()) + .setPenultimateBlock(convert(addBlock.getPenultimateBlock())) + .setLastBlock(convert(addBlock.getLastblock())).build()); + break; default: throw new RuntimeException("Unexpected inotify event type: " + p.getType()); @@ -2809,6 +2817,24 @@ public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList .setTimestamp(te.getTimestamp()).build().toByteString() ).build()); break; + case ADD_BLOCK: + Event.AddBlockEvent abe = (Event.AddBlockEvent) e; + InotifyProtos.AddBlockEventProto.Builder addBlockBuilder = + InotifyProtos.AddBlockEventProto.newBuilder().setSrcPath(abe.getPath()) + .setBlockPoolId(abe.getBlockPoolId()); + if (abe.getPenultimateBlock() != null) { + addBlockBuilder.setPenultimateBlock(convert(abe.getPenultimateBlock())); + } + if (abe.getLastBlock() != null) { + addBlockBuilder.setLastblock(convert(abe.getLastBlock())).build().toByteString(); + } + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_ADDBLOCK) + .setContents( + addBlockBuilder.build().toByteString() + ).build()); + break; + default: throw new RuntimeException("Unexpected inotify event: " + e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e0eae26ae6b4c..29cd7619b1dce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -826,8 +826,7 @@ private LocatedBlock createLocatedBlock(final BlockInfo[] blocks, return createLocatedBlock(blocks[curBlk], curPos, mode); } - private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos, - final AccessMode mode) throws IOException { + public LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos, final AccessMode mode) throws IOException { final LocatedBlock lb = createLocatedBlock(blk, pos); if (mode != null) { setBlockToken(lb, mode); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 2a026af6b74ed..fc943cead1931 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -102,7 +102,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.client.BlockReportOptions; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index 14f4d66e885c9..1745a131e019e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.io.Charsets; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException; @@ -27,8 +28,11 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -36,14 +40,22 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.util.ReadOnlyList; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; +import java.util.List; import static org.apache.hadoop.util.Time.now; @@ -201,6 +213,26 @@ static GetBlockLocationsResult getBlockLocations( } } + /** + * Get block locations within the specified range. + * @see ClientProtocol#getBlockLocations(String, long, long) + * @throws IOException + */ + static LocatedBlock getBlockLocation(FSNamesystem nameSystem, CacheManager cm, + ExtendedBlock extendedBlock) throws IOException { + BlockManager blockManager = nameSystem.getBlockManager(); + BlockTokenIdentifier.AccessMode mode = BlockTokenIdentifier.AccessMode.READ; + nameSystem.readLock(); + try { + BlockInfo blockInfo = blockManager.getStoredBlock(extendedBlock.getLocalBlock()); + // should we check for complete as well or is that up to the client + // should we add this block to the cache manager? + return blockManager.createLocatedBlock(blockInfo, -1, mode); + } finally { + nameSystem.readUnlock(); + } + } + private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED ? inodePolicy : parentPolicy; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 57bee5a8f195d..4ca65ca57e70b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1760,6 +1760,40 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg, return blocks; } + LocatedBlock getBlockLocation(ExtendedBlock block) throws IOException { + checkOperation(OperationCategory.READ); + LocatedBlock res = null; + readLock(); + try { + checkOperation(OperationCategory.READ); + res = FSDirStatAndListingOp.getBlockLocation(this, getCacheManager(), block); + if (res != null) { + if (isInSafeMode()) { + // if safemode & no block locations yet then throw safemodeException + if ((res.getLocations() == null) || (res.getLocations().length == 0)) { + SafeModeException se = newSafemodeException( + "Zero blocklocations for " + block); + if (haEnabled && haContext != null && + haContext.getState().getServiceState() == HAServiceState.ACTIVE) { + throw new RetriableException(se); + } else { + throw se; + } + } + } + } + } catch (AccessControlException e) { + logAuditEvent(false, "open", block.toString()); + throw e; + } finally { + readUnlock(); + } + + // TODO rg: sort by distance to datanodes do we care, we can do that when she + // get the stream from the dfs client for that block + return res; + } + /** * Moves all the blocks from {@code srcs} and appends them to {@code target} * To avoid rollbacks we will verify validity of ALL of the args diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java index 09181074643ff..b062a9cc9609c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java @@ -23,6 +23,9 @@ import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import java.util.List; @@ -40,7 +43,7 @@ private static long getSize(FSEditLogOp.AddCloseOp acOp) { return size; } - public static EventBatch translate(FSEditLogOp op) { + public static EventBatch translate(FSEditLogOp op, String blockPoolId) { switch(op.opCode) { case OP_ADD: FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op; @@ -180,6 +183,16 @@ public static EventBatch translate(FSEditLogOp op) { FSEditLogOp.TruncateOp tOp = (FSEditLogOp.TruncateOp) op; return new EventBatch(op.txid, new Event[] { new Event.TruncateEvent(tOp.src, tOp.newLength, tOp.timestamp) }); + case OP_ADD_BLOCK: + FSEditLogOp.AddBlockOp addBlockOp = (FSEditLogOp.AddBlockOp) op; + Block penultimateBlock = addBlockOp.getPenultimateBlock(); + Block lastBlock = addBlockOp.getLastBlock(); + return new EventBatch(op.txid, + new Event[] { new Event.AddBlockEvent.Builder() + .setBlockPoolId(blockPoolId) + .setPath(addBlockOp.getPath()) + .setPenultimateBlock(penultimateBlock) + .setLastBlock(lastBlock).build() }); default: return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index ccca18b6005d7..acb9cac333556 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -590,7 +590,14 @@ public LocatedBlocks getBlockLocations(String src, return namesystem.getBlockLocations(getClientMachine(), src, offset, length); } - + + @Override + public LocatedBlock getBlockLocation(ExtendedBlock block) throws IOException { + checkNNStartup(); + metrics.incrGetBlockLocations(); + return namesystem.getBlockLocation(getClientMachine(), block); + } + @Override // ClientProtocol public FsServerDefaults getServerDefaults() throws IOException { checkNNStartup(); @@ -1986,7 +1993,7 @@ public EventBatchList getEditsFromTxid(long txid) throws IOException { break; } - EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op); + EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op, namesystem.getBlockPoolId()); if (eventBatch != null) { batches.add(eventBatch); totalEvents += eventBatch.getEvents().length; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockLevelCopying.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockLevelCopying.java new file mode 100644 index 0000000000000..25adeee8eb7ad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockLevelCopying.java @@ -0,0 +1,132 @@ +package org.apache.hadoop.hdfs; + +import com.google.common.collect.Lists; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventBatch; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class TestBlockLevelCopying { + + private MiniDFSCluster clusterA; + private int blockSize; + + @Before + public void setUp() throws Exception { + blockSize = 64; + Configuration conf = new Configuration(); + conf.setInt("dfs.namenode.fs-limits.min-block-size", 1); + conf.setInt("dfs.bytes-per-checksum", blockSize); + conf.setInt("dfs.blocksize", blockSize); + clusterA = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + } + + @Test + public void testCopyingABlockOverWithINotify() throws Exception { + DFSClient clientA = new DFSClient(clusterA.getURI(), clusterA.getConfiguration(0)); + DFSInotifyEventInputStream stream = clientA.getInotifyEventStream(); + + String randomFileName = UUID.randomUUID().toString(); + Path path = new Path("/" + randomFileName); + createFile(clusterA, path); + DistributedFileSystem clusterAFS = clusterA.getFileSystem(); + FileStatus fileStatus = clusterAFS.getFileStatus(path); + BlockLocation[] locations = clusterAFS.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + assertEquals(2, locations.length); // should be two blocks + + + + // now get the inotify stream to get extended blocks + List extendedBlocks = getAddedBlocksFromEventStream(stream); + assertEquals(2, extendedBlocks.size()); // still have 2 blocks + + + // lets grab the Located Block from the NN and see what fun can happen + List locatedBlocks = Lists.newArrayList(); + for (ExtendedBlock extendedBlock : extendedBlocks) { + locatedBlocks.add(clusterA.getNameNode().getRpcServer().getBlockLocation(extendedBlock)); + } + assertEquals(2, locatedBlocks.size()); // still have 2 blocks + + // lets grab the first blocks data + DFSInputStream dfsInputStream = clientA.open(path.toString(), blockSize, true); + byte[] result = new byte[blockSize]; + dfsInputStream.read(result); + String expected = Hex.encodeHexString(result); + + // Now here is where the shit show starts, cover your kids ears and eyes + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + clientA.copyBlock(locatedBlocks.get(0), baos); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + String actual = Hex.encodeHexString(IOUtils.toByteArray(bais)); + + assertEquals(expected, actual); + + + } + + private void createFile(MiniDFSCluster cluster, Path file) throws Exception { + DistributedFileSystem fs = cluster.getFileSystem(); + FSDataOutputStream outputStream = fs.create(file, true, 4, (short) 3, blockSize); + outputStream.close(); + outputStream = fs.append(file); + int numInts = blockSize / 4; + int i = 0; + while (i < numInts) { + outputStream.writeInt(0); + i++; + } + int j = 0; + while (j < numInts) { + outputStream.writeInt(1); + j++; + } + outputStream.close(); + } + + private List getAddedBlocksFromEventStream(DFSInotifyEventInputStream stream) throws Exception { + List extendedBlocks = Lists.newArrayList(); + EventBatch eventBatch; + while ((eventBatch = stream.poll()) != null) { + for (Event event : eventBatch.getEvents()) { + if (event.getEventType().equals(Event.EventType.ADD_BLOCK)) { + Event.AddBlockEvent addBlockEvent = (Event.AddBlockEvent) event; + extendedBlocks + .add(new ExtendedBlock(addBlockEvent.getBlockPoolId(), addBlockEvent.getLastBlock())); + } + } + } + return extendedBlocks; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java index beabfc3341906..53fccc2e5d416 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java @@ -63,7 +63,7 @@ public class TestPersistBlocks { private static final String FILE_NAME = "/data"; private static final Path FILE_PATH = new Path(FILE_NAME); - + static final byte[] DATA_BEFORE_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS]; static final byte[] DATA_AFTER_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestThisStupidThing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestThisStupidThing.java new file mode 100644 index 0000000000000..952e64ba14120 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestThisStupidThing.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.URI; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class TestThisStupidThing { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + private MiniDFSCluster cluster; + private String nameNodeURI; + public static final int BLOCK_SIZE = 1024; + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + File baseDir = folder.newFolder("test").getAbsoluteFile(); + conf.setInt("dfs.blocksize", 1024); + FileUtil.fullyDelete(baseDir); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + cluster = builder.build(); + nameNodeURI = "hdfs://localhost:" + cluster.getNameNodePort() + "/"; + } + + @Test + public void writingAcrossBlocks() throws Exception { + FileSystem fileSystem = FileSystem.get(URI.create(nameNodeURI), cluster.getConfiguration(0)); + Path path = new Path("/foo/testAppend.out"); + int numBlocks = 2; + byte[] data = new byte[BLOCK_SIZE * numBlocks]; + int i = 0; + FSDataOutputStream stream = fileSystem.append(path, 1024); + new Random().nextBytes(data); + stream.write(data); + stream.close(); + } +} \ No newline at end of file