Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;

import java.util.List;

Expand All @@ -35,7 +36,7 @@
@InterfaceStability.Unstable
public abstract class Event {
public static enum EventType {
CREATE, CLOSE, APPEND, RENAME, METADATA, UNLINK, TRUNCATE
CREATE, CLOSE, APPEND, RENAME, METADATA, UNLINK, TRUNCATE, ADD_BLOCK
}

private EventType eventType;
Expand Down Expand Up @@ -530,6 +531,77 @@ public String toString() {

}

@InterfaceAudience.Public
public static class AddBlockEvent extends Event {
private String path;
private String blockPoolId;
private Block lastBlock;
private Block penultimateBlock;

public static class Builder {
private String path;
private String blockPoolId;
private Block lastBlock;
private Block penultimateBlock;

public Builder setPath(String path) {
this.path = path;
return this;
}

public Builder setBlockPoolId(String blockPoolId) {
this.blockPoolId = blockPoolId;
return this;
}

public Builder setLastBlock(Block lastBlock) {
this.lastBlock = lastBlock;
return this;
}

public Builder setPenultimateBlock(Block penultimateBlock) {
this.penultimateBlock = penultimateBlock;
return this;
}

public AddBlockEvent build() {
return new AddBlockEvent(this);
}
}

public AddBlockEvent(Builder builder) {
super(EventType.ADD_BLOCK);
this.path = builder.path;
this.blockPoolId = builder.blockPoolId;
this.penultimateBlock = builder.penultimateBlock;
this.lastBlock = builder.lastBlock;
}

public String getPath() {
return path;
}

public String getBlockPoolId() {
return blockPoolId;
}

public Block getLastBlock() {
return lastBlock;
}

public Block getPenultimateBlock() {
return penultimateBlock;
}

@Override
@InterfaceStability.Unstable
public String toString() {
return "AddBlockEvent [path=" + path + ", poolId=" + blockPoolId +
", penultimateBlock=" + penultimateBlock + ", lastBlock=" + lastBlock + "]";
}

}

/**
* Sent when an existing file is opened for append.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,32 @@ public interface ClientProtocol {
LocatedBlocks getBlockLocations(String src, long offset, long length)
throws IOException;

///////////////////////////////////////
// Block contents
///////////////////////////////////////
/**
* Get locations of the block of the specified Extended Block
* DataNode locations for each block are sorted by
* the proximity to the client. (TODO: rg - do we really need this here?)
* <p>
* Return {@link LocatedBlocks} which contains
* file length, blocks and their locations.
* DataNode locations for each block are sorted by
* the distance to the client's address.
* <p>
* The client will then have to contact
* one of the indicated DataNodes to obtain the actual data.
*
* @param block the block you would like to grab
* @return block with locations
*
* @throws org.apache.hadoop.security.AccessControlException If access is
* denied
* @throws IOException If an I/O error occurred
*/
@Idempotent
LocatedBlock getBlockLocation(ExtendedBlock block) throws IOException;

/**
* Get server default values for a number of configuration params.
* @return a set of server default configuration values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ message GetBlockLocationsResponseProto {
optional LocatedBlocksProto locations = 1;
}

message GetBlockLocationRequestProto {
required ExtendedBlockProto block = 1;
}

message GetBlockLocationResponseProto {
optional LocatedBlockProto block = 1;
}

message GetServerDefaultsRequestProto { // No parameters
}

Expand Down Expand Up @@ -722,6 +730,8 @@ message GetEditsFromTxidResponseProto {
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
rpc getBlockLocation(GetBlockLocationRequestProto)
returns(GetBlockLocationResponseProto);
rpc getServerDefaults(GetServerDefaultsRequestProto)
returns(GetServerDefaultsResponseProto);
rpc create(CreateRequestProto)returns(CreateResponseProto);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum EventType {
EVENT_METADATA = 0x4;
EVENT_UNLINK = 0x5;
EVENT_TRUNCATE = 0x6;
EVENT_ADDBLOCK = 0x7;
}

message EventProto {
Expand Down Expand Up @@ -105,6 +106,14 @@ message RenameEventProto {
required int64 timestamp = 3;
}

message AddBlockEventProto {
required string srcPath = 1;
required string blockPoolId = 2;
optional BlockProto penultimateBlock = 3;
optional BlockProto lastblock = 4;

}

message MetadataUpdateEventProto {
required string path = 1;
required MetadataUpdateType type = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
Expand All @@ -47,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -157,6 +157,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.DataOutputBuffer;
Expand All @@ -167,7 +168,6 @@
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
Expand Down Expand Up @@ -3223,4 +3223,90 @@ TraceScope getSrcDstTraceScope(String description, String src, String dst) {
}
return scope;
}

void copyBlock(LocatedBlock lblock, OutputStream fos) throws Exception {
int failures = 0;
InetSocketAddress targetAddr = null;
TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
BlockReader blockReader = null;
ExtendedBlock block = lblock.getBlock();

while (blockReader == null) {
DatanodeInfo chosenNode;

try {
chosenNode = bestNode(lblock.getLocations(), deadNodes);
targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr());
} catch (IOException ie) {
if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) {
throw new IOException("Could not obtain block " + lblock, ie);
}
LOG.info("Could not obtain block from any node: " + ie);
try {
Thread.sleep(10000);
} catch (InterruptedException iex) {
}
deadNodes.clear();
failures++;
continue;
}
try {
String file = BlockReaderFactory.getFileName(targetAddr,
block.getBlockPoolId(), block.getBlockId());
blockReader = new BlockReaderFactory(dfsClientConf).
setFileName(file).
setBlock(block).
setBlockToken(lblock.getBlockToken()).
setStartOffset(0).
setLength(-1).
setVerifyChecksum(true).
setClientName("fsck").
setDatanodeInfo(chosenNode).
setInetSocketAddress(targetAddr).
setCachingStrategy(CachingStrategy.newDropBehind()).
setClientCacheContext(getClientContext()).
setConfiguration(conf).
setRemotePeerFactory(this).
build();
} catch (IOException ex) {
// Put chosen node into dead list, continue
LOG.info("Failed to connect to " + targetAddr + ":" + ex);
deadNodes.add(chosenNode);
}
}
byte[] buf = new byte[1024];
int cnt = 0;
boolean success = true;
long bytesRead = 0;
try {
while ((cnt = blockReader.read(buf, 0, buf.length)) > 0) {
fos.write(buf, 0, cnt);
bytesRead += cnt;
}
if ( bytesRead != block.getNumBytes() ) {
throw new IOException("Recorded block size is " + block.getNumBytes() +
", but datanode returned " +bytesRead+" bytes");
}
} catch (Exception e) {
LOG.error("Error reading block", e);
success = false;
} finally {
blockReader.close();
}
if (!success) {
throw new Exception("Could not copy block data for " + lblock.getBlock());
}
}

private static DatanodeInfo bestNode(DatanodeInfo[] nodes, TreeSet<DatanodeInfo> deadNodes)
throws IOException {
if ((nodes == null) || (nodes.length - deadNodes.size() < 1)) {
throw new IOException("No live nodes contain current block");
}
DatanodeInfo chosenNode;
do {
chosenNode = nodes[ThreadLocalRandom.current().nextInt(nodes.length)];
} while (deadNodes.contains(chosenNode));
return chosenNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
Expand Down Expand Up @@ -227,6 +228,8 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;

import static org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.*;

/**
* This class is used on the server side. Calls come across the wire for the
* for protocol {@link ClientNamenodeProtocolPB}.
Expand Down Expand Up @@ -378,6 +381,22 @@ public GetBlockLocationsResponseProto getBlockLocations(
}
}

@Override
public GetBlockLocationResponseProto getBlockLocation(
RpcController controller, GetBlockLocationRequestProto request)
throws ServiceException {
try {
LocatedBlock block = server.getBlockLocation(PBHelper.convert(request.getBlock()));
GetBlockLocationResponseProto.Builder builder = GetBlockLocationResponseProto.newBuilder();
if (block != null) {
builder.setBlock(PBHelper.convert(block)).build();
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
public GetServerDefaultsResponseProto getServerDefaults(
RpcController controller, GetServerDefaultsRequestProto req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveRequestProto;
Expand All @@ -98,6 +99,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
Expand Down Expand Up @@ -262,6 +264,22 @@ public LocatedBlocks getBlockLocations(String src, long offset, long length)
}
}

@Override
public LocatedBlock getBlockLocation(ExtendedBlock block) throws IOException {
GetBlockLocationRequestProto req = GetBlockLocationRequestProto
.newBuilder()
.setBlock(PBHelper.convert(block))
.build();
try {
ClientNamenodeProtocolProtos.GetBlockLocationResponseProto
resp = rpcProxy.getBlockLocation(null, req);
return resp != null ?
PBHelper.convert(resp.getBlock()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override
public FsServerDefaults getServerDefaults() throws IOException {
GetServerDefaultsRequestProto req = VOID_GET_SERVER_DEFAULT_REQUEST;
Expand Down
Loading