Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public interface DBCheckpoint {
*/
long getLatestSequenceNumber();

/**
* Time taken in milliseconds for the checkpoint to be created.
*/
long checkpointCreationTimeTaken();

/**
* Destroy the contents of the specified checkpoint to ensure
* proper cleanup of the footprint on disk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -41,7 +43,6 @@ public class RDBCheckpointManager {
public static final String RDB_CHECKPOINT_DIR_PREFIX = "rdb_checkpoint_";
private static final Logger LOG =
LoggerFactory.getLogger(RDBCheckpointManager.class);
public static final String JAVA_TMP_DIR = "java.io.tmpdir";
private String checkpointNamePrefix = "";

public RDBCheckpointManager(RocksDB rocksDB) {
Expand Down Expand Up @@ -79,12 +80,19 @@ public RocksDBCheckpoint createCheckpoint(String parentDir) {
checkpointDir += "_" + RDB_CHECKPOINT_DIR_PREFIX + currentTime;

Path checkpointPath = Paths.get(parentDir, checkpointDir);
Instant start = Instant.now();
checkpoint.createCheckpoint(checkpointPath.toString());
Instant end = Instant.now();

long duration = Duration.between(start, end).toMillis();
LOG.debug("Created checkpoint at " + checkpointPath.toString() + " in "
+ duration + " milliseconds");

return new RocksDBCheckpoint(
checkpointPath,
currentTime,
db.getLatestSequenceNumber()); //Best guesstimate here. Not accurate.
db.getLatestSequenceNumber(), //Best guesstimate here. Not accurate.
duration);

} catch (RocksDBException e) {
LOG.error("Unable to create RocksDB Snapshot.", e);
Expand All @@ -97,13 +105,16 @@ static class RocksDBCheckpoint implements DBCheckpoint {
private Path checkpointLocation;
private long checkpointTimestamp;
private long latestSequenceNumber;
private long checkpointCreationTimeTaken;

RocksDBCheckpoint(Path checkpointLocation,
long snapshotTimestamp,
long latestSequenceNumber) {
long latestSequenceNumber,
long checkpointCreationTimeTaken) {
this.checkpointLocation = checkpointLocation;
this.checkpointTimestamp = snapshotTimestamp;
this.latestSequenceNumber = latestSequenceNumber;
this.checkpointCreationTimeTaken = checkpointCreationTimeTaken;
}

@Override
Expand All @@ -121,8 +132,14 @@ public long getLatestSequenceNumber() {
return this.latestSequenceNumber;
}

@Override
public long checkpointCreationTimeTaken() {
return checkpointCreationTimeTaken;
}

@Override
public void cleanupCheckpoint() throws IOException {
LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
FileUtils.deleteDirectory(checkpointLocation.toFile());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.UUID;

import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Matchers;

import static org.apache.hadoop.ozone.OzoneConsts.
OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Class used for testing the OM DB Checkpoint provider servlet.
*/
public class TestOMDbCheckpointServlet {
private MiniOzoneCluster cluster = null;
private OMMetrics omMetrics;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
private String omId;

@Rule
public Timeout timeout = new Timeout(60000);

/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@Before
public void init() throws Exception {
conf = new OzoneConfiguration();
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
cluster = MiniOzoneCluster.newBuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
.setOmId(omId)
.build();
cluster.waitForClusterToBeReady();
omMetrics = cluster.getOzoneManager().getMetrics();
}

/**
* Shutdown MiniDFSCluster.
*/
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}

@Test
public void testDoGet() throws ServletException, IOException {

File tempFile = null;
try {
OMDBCheckpointServlet omDbCheckpointServletMock =
mock(OMDBCheckpointServlet.class);

doCallRealMethod().when(omDbCheckpointServletMock).init();

HttpServletRequest requestMock = mock(HttpServletRequest.class);
HttpServletResponse responseMock = mock(HttpServletResponse.class);

ServletContext servletContextMock = mock(ServletContext.class);
when(omDbCheckpointServletMock.getServletContext())
.thenReturn(servletContextMock);

when(servletContextMock.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE))
.thenReturn(cluster.getOzoneManager());
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH))
.thenReturn("true");
doNothing().when(responseMock).setContentType("application/x-tgz");
doNothing().when(responseMock).setHeader(Matchers.anyString(),
Matchers.anyString());

tempFile = File.createTempFile("testDoGet_" + System
.currentTimeMillis(), ".tar.gz");

FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
when(responseMock.getOutputStream()).thenReturn(
new ServletOutputStream() {
@Override
public boolean isReady() {
return true;
}

@Override
public void setWriteListener(WriteListener writeListener) {
}

@Override
public void write(int b) throws IOException {
fileOutputStream.write(b);
}
});

doCallRealMethod().when(omDbCheckpointServletMock).doGet(requestMock,
responseMock);

omDbCheckpointServletMock.init();

Assert.assertTrue(
omMetrics.getLastCheckpointCreationTimeTaken() == 0);
Assert.assertTrue(
omMetrics.getLastCheckpointTarOperationTimeTaken() == 0);
Assert.assertTrue(
omMetrics.getLastCheckpointStreamingTimeTaken() == 0);

omDbCheckpointServletMock.doGet(requestMock, responseMock);

Assert.assertTrue(tempFile.length() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointCreationTimeTaken() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointTarOperationTimeTaken() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointStreamingTimeTaken() > 0);
} finally {
FileUtils.deleteQuietly(tempFile);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class OMDBCheckpointServlet extends HttpServlet {
private static final long serialVersionUID = 1L;

private transient DBStore omDbStore;
private transient OMMetrics omMetrics;
private transient DataTransferThrottler throttler = null;

@Override
Expand All @@ -67,6 +70,8 @@ public void init() throws ServletException {
}

omDbStore = om.getMetadataManager().getStore();
omMetrics = om.getMetrics();

OzoneConfiguration configuration = om.getConfiguration();
long transferBandwidth = configuration.getLongBytes(
OMConfigKeys.OZONE_DB_CHECKPOINT_TRANSFER_RATE_KEY,
Expand Down Expand Up @@ -112,19 +117,38 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) {
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
return;
}
omMetrics.setLastCheckpointCreationTimeTaken(
checkpoint.checkpointCreationTimeTaken());

Instant start = Instant.now();
checkPointTarFile = OmUtils.createTarFile(
checkpoint.getCheckpointLocation());
LOG.info("Tar location = " + checkPointTarFile.getAbsolutePath());
Instant end = Instant.now();

long duration = Duration.between(start, end).toMillis();
LOG.debug("Time taken to archive the checkpoint : " + duration +
" milliseconds");
LOG.info("Checkpoint Tar location = " +
checkPointTarFile.getAbsolutePath());
omMetrics.setLastCheckpointTarOperationTimeTaken(duration);

response.setContentType("application/x-tgz");
response.setHeader("Content-Disposition",
"attachment; filename=\"" +
checkPointTarFile.getName() + "\"");

checkpointFileInputStream = new FileInputStream(checkPointTarFile);
start = Instant.now();
TransferFsImage.copyFileToStream(response.getOutputStream(),
checkPointTarFile,
checkpointFileInputStream,
throttler);
end = Instant.now();

duration = Duration.between(start, end).toMillis();
LOG.debug("Time taken to write the checkpoint to response output " +
"stream: " + duration + " milliseconds");
omMetrics.setLastCheckpointStreamingTimeTaken(duration);

checkpoint.cleanupCheckpoint();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

/**
* This class is for maintaining Ozone Manager statistics.
Expand Down Expand Up @@ -105,6 +106,10 @@ public class OMMetrics {
// few minutes before restart may not be included in this count.
private @Metric MutableCounterLong numKeys;

// Metrics to track checkpointing statistics from last run.
private @Metric MutableGaugeLong lastCheckpointCreationTimeTaken;
private @Metric MutableGaugeLong lastCheckpointTarOperationTimeTaken;
private @Metric MutableGaugeLong lastCheckpointStreamingTimeTaken;

public OMMetrics() {
}
Expand Down Expand Up @@ -390,6 +395,18 @@ public void incNumGetServiceListFails() {
numGetServiceListFails.incr();
}

public void setLastCheckpointCreationTimeTaken(long val) {
this.lastCheckpointCreationTimeTaken.set(val);
}

public void setLastCheckpointTarOperationTimeTaken(long val) {
this.lastCheckpointTarOperationTimeTaken.set(val);
}

public void setLastCheckpointStreamingTimeTaken(long val) {
this.lastCheckpointStreamingTimeTaken.set(val);
}

@VisibleForTesting
public long getNumVolumeCreates() {
return numVolumeCreates.value();
Expand Down Expand Up @@ -606,6 +623,21 @@ public long getNumAbortMultipartUploadFails() {
return numAbortMultipartUploadFails.value();
}

@VisibleForTesting
public long getLastCheckpointCreationTimeTaken() {
return lastCheckpointCreationTimeTaken.value();
}

@VisibleForTesting
public long getLastCheckpointTarOperationTimeTaken() {
return lastCheckpointTarOperationTimeTaken.value();
}

@VisibleForTesting
public long getLastCheckpointStreamingTimeTaken() {
return lastCheckpointStreamingTimeTaken.value();
}

public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
Expand Down