Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ private void dispatch() {
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this, e);
nnc.getBlocksFailed().incrementAndGet();
target.getDDatanode().setHasFailure();
// Check that the failure is due to block pinning errors.
if (e instanceof BlockPinningException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static void checkOtherInstanceRunning(boolean toCheck) {
private final List<Path> targetPaths;
private final AtomicLong bytesMoved = new AtomicLong();
private final AtomicLong blocksMoved = new AtomicLong();
private final AtomicLong blocksFailed = new AtomicLong();

private final int maxNotChangedIterations;
private int notChangedIterations = 0;
Expand Down Expand Up @@ -230,14 +231,18 @@ public String getBlockpoolID() {
return blockpoolID;
}

AtomicLong getBytesMoved() {
public AtomicLong getBytesMoved() {
return bytesMoved;
}

AtomicLong getBlocksMoved() {
public AtomicLong getBlocksMoved() {
return blocksMoved;
}

public AtomicLong getBlocksFailed() {
return blocksFailed;
}

public void addBytesMoved(long numBytes) {
bytesMoved.addAndGet(numBytes);
blocksMoved.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtil;
Expand Down Expand Up @@ -118,6 +120,8 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
private final int retryMaxAttempts;
private final AtomicInteger retryCount;
private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks;
private final MoverMetrics metrics;
private final NameNodeConnector nnc;

private final BlockStoragePolicy[] blockStoragePolicies;

Expand Down Expand Up @@ -155,6 +159,8 @@ Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
BlockStoragePolicySuite.ID_BIT_LENGTH];
this.excludedPinnedBlocks = excludedPinnedBlocks;
this.nnc = nnc;
this.metrics = MoverMetrics.create(this);
}

void init() throws IOException {
Expand Down Expand Up @@ -196,6 +202,10 @@ private ExitStatus run() {
}
}

public NameNodeConnector getNnc() {
return nnc;
}

DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
ErasureCodingPolicy ecPolicy) {
Block blk = lb.getBlock().getLocalBlock();
Expand Down Expand Up @@ -296,6 +306,7 @@ private boolean isSnapshotPathInCurrent(String path) throws IOException {
* round
*/
private Result processNamespace() throws IOException {
metrics.setProcessingNamespace(true);
getSnapshottableDirs();
Result result = new Result();
for (Path target : targetPaths) {
Expand All @@ -322,6 +333,7 @@ private Result processNamespace() throws IOException {
retryCount.set(0);
}
result.updateHasRemaining(hasFailed);
metrics.setProcessingNamespace(false);
return result;
}

Expand Down Expand Up @@ -374,6 +386,7 @@ private void processRecursively(String parent, HdfsFileStatus status,
// the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it.
processFile(fullPath, (HdfsLocatedFileStatus) status, result);
metrics.incrFilesProcessed();
}
} catch (IOException e) {
LOG.warn("Failed to check the status of " + parent
Expand Down Expand Up @@ -521,6 +534,7 @@ boolean chooseTargetInSameNode(DBlock db, Source source,
final PendingMove pm = source.addPendingMove(db, target);
if (pm != null) {
dispatcher.executePendingMove(pm);
metrics.incrBlocksScheduled();
return true;
}
}
Expand All @@ -539,6 +553,7 @@ boolean chooseTarget(DBlock db, Source source,
final PendingMove pm = source.addPendingMove(db, target);
if (pm != null) {
dispatcher.executePendingMove(pm);
metrics.incrBlocksScheduled();
return true;
}
}
Expand Down Expand Up @@ -650,6 +665,11 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>();
LOG.info("namenodes = " + namenodes);

DefaultMetricsSystem.initialize("Mover");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Balancer have a similar metrics? If not shall we use this PR or create a new one to do that?

Copy link
Contributor Author

@LeonGao91 LeonGao91 Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have it in the internal branch with similar implementation but it is not in upstream yet.

Actually, there has been a JIRA long time ago for balancer metrics. https://issues.apache.org/jira/browse/HDFS-10648

I can rebase our changes there as a different PR after this.

JvmMetrics.create("Mover",
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
DefaultMetricsSystem.instance());

checkKeytabAndInit(conf);
List<NameNodeConnector> connectors = Collections.emptyList();
try {
Expand Down Expand Up @@ -818,6 +838,7 @@ public int run(String[] args) throws Exception {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode();
} finally {
DefaultMetricsSystem.shutdown();
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.mover;

import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;

/**
* Metrics for HDFS Mover of a blockpool.
*/
@Metrics(about="Mover metrics", context="dfs")
final class MoverMetrics {

private final Mover mover;

@Metric("If mover is processing namespace.")
private MutableGaugeInt processingNamespace;

@Metric("Number of blocks being scheduled.")
private MutableCounterLong blocksScheduled;

@Metric("Number of files being processed.")
private MutableCounterLong filesProcessed;

private MoverMetrics(Mover m) {
this.mover = m;
}

public static MoverMetrics create(Mover mover) {
MoverMetrics m = new MoverMetrics(mover);
return DefaultMetricsSystem.instance().register(
m.getName(), null, m);
}

String getName() {
return "Mover-" + mover.getNnc().getBlockpoolID();
}

@Metric("Bytes that already moved by mover.")
public long getBytesMoved() {
return mover.getNnc().getBytesMoved().get();
}

@Metric("Number of blocks that successfully moved by mover.")
public long getBlocksMoved() {
return mover.getNnc().getBlocksMoved().get();
}

@Metric("Number of blocks that failed moved by mover.")
public long getBlocksFailed() {
return mover.getNnc().getBlocksFailed().get();
}

void setProcessingNamespace(boolean processingNamespace) {
this.processingNamespace.set(processingNamespace ? 1 : 0);
}

void incrBlocksScheduled() {
this.blocksScheduled.incr();
}

void incrFilesProcessed() {
this.filesProcessed.incr();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to pass the checkstyle as I added a new file to this package without package-info..

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Mover is a data migration tool for tiered storage.
* It scans provided paths in HDFS to check
* if the block placement satisfies the storage policy.
* For the blocks violating the storage policy,
* it moves the replicas to a different storage type
* in order to fulfill the storage policy requirement.
*/
package org.apache.hadoop.hdfs.server.mover;
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -86,12 +88,15 @@
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -1235,6 +1240,58 @@ public void testMoverWhenStoragePolicyUnset() throws Exception {
}
}

@Test(timeout=100000)
public void testMoverMetrics() throws Exception {
long blockSize = 10*1024*1024;
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);

final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
.build();

cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();

final String file = "/testMaxIterationTime.dat";
final Path path = new Path(file);
short repFactor = 1;
int seed = 0xFAFAFA;
// write to DISK
DFSTestUtil.createFile(fs, path, 4L * blockSize, repFactor, seed);

// move to ARCHIVE
fs.setStoragePolicy(new Path(file), "COLD");

Map<URI, List<Path>> nnWithPath = new HashMap<>();
List<Path> paths = new ArrayList<>();
paths.add(path);
nnWithPath
.put(DFSUtil.getInternalNsRpcUris(conf).iterator().next(), paths);

Mover.run(nnWithPath, conf);

final String moverMetricsName = "Mover-"
+ cluster.getNameNode(0).getNamesystem().getBlockPoolId();
MetricsSource moverMetrics =
DefaultMetricsSystem.instance().getSource(moverMetricsName);
assertNotNull(moverMetrics);

MetricsRecordBuilder rb = MetricsAsserts.getMetrics(moverMetricsName);
// Check metrics
assertEquals(4, MetricsAsserts.getLongCounter("BlocksScheduled", rb));
assertEquals(1, MetricsAsserts.getLongCounter("FilesProcessed", rb));
assertEquals(41943040, MetricsAsserts.getLongGauge("BytesMoved", rb));
assertEquals(4, MetricsAsserts.getLongGauge("BlocksMoved", rb));
assertEquals(0, MetricsAsserts.getLongGauge("BlocksFailed", rb));
}

private void createFileWithFavoredDatanodes(final Configuration conf,
final MiniDFSCluster cluster, final DistributedFileSystem dfs)
throws IOException {
Expand Down