Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -643,11 +643,11 @@ void setAccessToken(Token<BlockTokenIdentifier> t) {
this.accessToken = t;
}

private void setPipeline(LocatedBlock lb) {
protected void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}

private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
protected void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
String[] storageIDs) {
synchronized (nodesLock) {
this.nodes = nodes;
Expand Down Expand Up @@ -748,7 +748,7 @@ public void run() {

if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
setPipeline(nextBlockOutputStream());
setupPipelineForCreate();
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
Expand Down Expand Up @@ -966,8 +966,8 @@ void waitForAckedSeqno(long seqno) throws IOException {
long duration = Time.monotonicNowNanos() - begin;
if (TimeUnit.NANOSECONDS.toMillis(duration) > writeTimeout) {
LOG.error("No ack received, took {}ms (threshold={}ms). "
+ "File being written: {}, block: {}, "
+ "Write pipeline datanodes: {}.",
+ "File being written: {}, block: {}, "
+ "Write pipeline datanodes: {}.",
TimeUnit.NANOSECONDS.toMillis(duration), writeTimeout, src, block, nodes);
throw new InterruptedIOException("No ack received after " +
TimeUnit.NANOSECONDS.toSeconds(duration) + "s and a timeout of " +
Expand Down Expand Up @@ -1525,8 +1525,8 @@ private void addDatanode2ExistingPipeline() throws IOException {
// MIN_REPLICATION is set to 0 or less than zero, an exception will be
// thrown if a replacement could not be found.

if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && nodes.length
>= dfsClient.dtpReplaceDatanodeOnFailureReplication) {
if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 &&
nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication) {
DFSClient.LOG.warn(
"Failed to find a new datanode to add to the write pipeline,"
+ " continue to write to the pipeline with " + nodes.length
Expand Down Expand Up @@ -1607,8 +1607,11 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
*
* Returns boolean whether pipeline was setup successfully or not.
* This boolean is used upstream on whether to continue creating pipeline or throw exception
*/
private void setupPipelineForAppendOrRecovery() throws IOException {
private boolean setupPipelineForAppendOrRecovery() throws IOException {
// Check number of datanodes. Note that if there is no healthy datanode,
// this must be internal error because we mark external error in striped
// outputstream only when all the streamers are in the DATA_STREAMING stage
Expand All @@ -1618,24 +1621,33 @@ private void setupPipelineForAppendOrRecovery() throws IOException {
LOG.warn(msg);
lastException.set(new IOException(msg));
streamerClosed = true;
return;
return false;
}
setupPipelineInternal(nodes, storageTypes, storageIDs);
return setupPipelineInternal(nodes, storageTypes, storageIDs);
}

protected void setupPipelineInternal(DatanodeInfo[] datanodes,
protected boolean setupPipelineInternal(DatanodeInfo[] datanodes,
StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
throws IOException {
boolean success = false;
long newGS = 0L;
boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage;
while (!success && !streamerClosed && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
return;
return false;
}

final boolean isRecovery = errorState.hasInternalError() && !isCreateStage;

// During create stage, if we remove a node (nodes.length - 1)
// min replication should still be satisfied.
if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 &&
nodes.length - 1 >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) {
return false;
}

final boolean isRecovery = errorState.hasInternalError();
if (!handleBadDatanode()) {
return;
return false;
}

handleDatanodeReplacement();
Expand All @@ -1645,6 +1657,10 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes,
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();

if (isCreateStage) {
block.setCurrentBlock(lb.getBlock());
}

// set up the pipeline again with the remaining nodes
success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS,
isRecovery);
Expand All @@ -1657,6 +1673,7 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes,
if (success) {
updatePipeline(newGS);
}
return success;
}

/**
Expand Down Expand Up @@ -1795,7 +1812,7 @@ DatanodeInfo[] getExcludedNodes() {
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
protected LocatedBlock nextBlockOutputStream() throws IOException {
protected LocatedBlock setupPipelineForCreate() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
StorageType[] nextStorageTypes;
Expand All @@ -1806,7 +1823,6 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
do {
errorState.resetInternalError();
lastException.clear();

DatanodeInfo[] excluded = getExcludedNodes();
lb = locateFollowingBlock(
excluded.length > 0 ? excluded : null, oldBlock);
Expand All @@ -1817,10 +1833,10 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
nodes = lb.getLocations();
nextStorageTypes = lb.getStorageTypes();
nextStorageIDs = lb.getStorageIDs();

setPipeline(lb);
// Connect to first DataNode in the list.
success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
0L, false);
0L, false) || setupPipelineForAppendOrRecovery();

if (!success) {
LOG.warn("Abandoning " + block);
Expand All @@ -1830,6 +1846,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
setPipeline(null, null, null);
}
} while (!success && --count >= 0);

Expand All @@ -1854,10 +1871,8 @@ boolean createBlockOutputStream(DatanodeInfo[] nodes,
if (LOG.isDebugEnabled()) {
LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
}

// persist blocks on namenode on next flush
persistBlocks.set(true);

int refetchEncryptionKey = 1;
while (true) {
boolean result = false;
Expand Down Expand Up @@ -2259,4 +2274,4 @@ public String toString() {
return extendedBlock == null ?
"block==null" : "" + extendedBlock.getLocalBlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private LocatedBlock getFollowingBlock() throws IOException {
}

@Override
protected LocatedBlock nextBlockOutputStream() throws IOException {
protected LocatedBlock setupPipelineForCreate() throws IOException {
boolean success;
LocatedBlock lb = getFollowingBlock();
block.setCurrentBlock(lb.getBlock());
Expand All @@ -101,7 +101,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
DatanodeInfo[] nodes = lb.getLocations();
StorageType[] storageTypes = lb.getStorageTypes();
String[] storageIDs = lb.getStorageIDs();

setPipeline(lb);
// Connect to the DataNode. If fail the internal error state will be set.
success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,
false);
Expand All @@ -111,6 +111,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
setPipeline(null, null, null);
throw new IOException("Unable to create new block." + this);
}
return lb;
Expand All @@ -122,18 +123,18 @@ LocatedBlock peekFollowingBlock() {
}

@Override
protected void setupPipelineInternal(DatanodeInfo[] nodes,
protected boolean setupPipelineInternal(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
throws IOException {
boolean success = false;
while (!success && !streamerClosed() && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
return;
return false;
}
if (!handleBadDatanode()) {
// for striped streamer if it is datanode error then close the stream
// and return. no need to replace datanode
return;
return false;
}

// get a new generation stamp and an access token
Expand Down Expand Up @@ -179,6 +180,7 @@ assert getErrorState().hasExternalError()
setStreamerAsClosed();
}
} // while
return success;
}

void setExternalError() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ class BlockReceiver implements Closeable {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaHandler = datanode.data.createRbw(storageType, storageId,
block, allowLazyPersist);
block, allowLazyPersist, newGs);
if (newGs != 0L) {
block.setGenerationStamp(newGs);
}
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ ReplicaHandler createTemporary(StorageType storageType, String storageId,
ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist) throws IOException;

/**
* Creates a RBW replica and returns the meta info of the replica
*
* @param b block
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException;

/**
* Recovers a RBW replica and returns the meta info of the replica.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1587,15 +1587,29 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
return createRbw(storageType, storageId, b, allowLazyPersist, 0L);
}

@Override // FsDatasetSpi
public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist, long newGS) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
// In case of retries with same blockPoolId + blockId as before
// with updated GS, cleanup the old replica to avoid
// any multiple copies with same blockPoolId + blockId
if (newGS != 0L) {
cleanupReplica(b.getBlockPoolId(), replicaInfo);
} else {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
}
}
// create a new block
FsVolumeReference ref = null;
Expand Down
Loading