Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public final class OzoneConsts {
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";
public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
public static final String OM_DB_CHECKPOINTS_DIR_NAME = "om.db.checkpoints";
public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
public static final String SCM_DB_NAME = "scm.db";
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,14 @@
<description>Byte limit for Raft's Log Worker queue.
</description>
</property>
<property>
<name>ozone.om.ratis.log.purge.gap</name>
<value>1000000</value>
<tag>OZONE, OM, RATIS</tag>
<description>The minimum gap between log indices for Raft server to purge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we will snapshot every 1024 transactions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, when a snapshot is being taken, if the gap between log purges is more than 1024, then it will purge the logs. Snapshot frequency is not dependent on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's set this to a higher value. We don't need to be too aggressive about purging Ratis logs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024 transactions is 100ms worth of edits in a busy cluster. We could set this as high as 1M maybe to keep more history. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Will update it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Will update it.

its log segments after taking snapshot.
</description>
</property>

<property>
<name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ private OMConfigKeys() {
"ozone.om.ratis.log.appender.queue.byte-limit";
public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
public static final String OZONE_OM_RATIS_LOG_PURGE_GAP =
"ozone.om.ratis.log.purge.gap";
public static final int OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please use the new format for configs? Here are some examples: https://cwiki.apache.org/confluence/display/HADOOP/Java-based+configuration+API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Let me file a followup jira to fix that. Want to get this patch committed today, it's been hanging around for over a month.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed HDDS-1831.

// OM Snapshot configurations
public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ public enum ResultCodes {

PREFIX_NOT_FOUND,

S3_BUCKET_INVALID_LENGTH
S3_BUCKET_INVALID_LENGTH,

RATIS_ERROR // Error in Ratis server
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public interface OzoneManagerHAProtocol {
/**
* Store the snapshot index i.e. the raft log index, corresponding to the
* last transaction applied to the OM RocksDB, in OM metadata dir on disk.
* @param flush flush the OM DB to disk if true
* @return the snapshot index
* @throws IOException
*/
long saveRatisSnapshot() throws IOException;
long saveRatisSnapshot(boolean flush) throws IOException;

}
2 changes: 2 additions & 0 deletions hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ enum Status {
PREFIX_NOT_FOUND=50;

S3_BUCKET_INVALID_LENGTH = 51; // s3 bucket invalid length.

RATIS_ERROR = 52;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ abstract class Builder {
protected String clusterId;
protected String omServiceId;
protected int numOfOMs;
protected int numOfActiveOMs;

protected Optional<Boolean> enableTrace = Optional.of(false);
protected Optional<Integer> hbInterval = Optional.empty();
Expand Down Expand Up @@ -440,6 +441,11 @@ public Builder setNumOfOzoneManagers(int numOMs) {
return this;
}

public Builder setNumOfActiveOMs(int numActiveOMs) {
this.numOfActiveOMs = numActiveOMs;
return this;
}

public Builder setStreamBufferSizeUnit(StorageUnit unit) {
this.streamBufferSizeUnit = Optional.of(unit);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private Map<String, OzoneManager> ozoneManagerMap;
private List<OzoneManager> ozoneManagers;

// Active OMs denote OMs which are up and running
private List<OzoneManager> activeOMs;
private List<OzoneManager> inactiveOMs;

private static final Random RANDOM = new Random();
private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds

Expand All @@ -67,11 +71,15 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private MiniOzoneHAClusterImpl(
OzoneConfiguration conf,
Map<String, OzoneManager> omMap,
List<OzoneManager> activeOMList,
List<OzoneManager> inactiveOMList,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
super(conf, scm, hddsDatanodes);
this.ozoneManagerMap = omMap;
this.ozoneManagers = new ArrayList<>(omMap.values());
this.activeOMs = activeOMList;
this.inactiveOMs = inactiveOMList;
}

/**
Expand All @@ -83,6 +91,10 @@ public OzoneManager getOzoneManager() {
return this.ozoneManagers.get(0);
}

public boolean isOMActive(String omNodeId) {
return activeOMs.contains(ozoneManagerMap.get(omNodeId));
}

public OzoneManager getOzoneManager(int index) {
return this.ozoneManagers.get(index);
}
Expand All @@ -91,6 +103,20 @@ public OzoneManager getOzoneManager(String omNodeId) {
return this.ozoneManagerMap.get(omNodeId);
}

/**
* Start a previously inactive OM.
*/
public void startInactiveOM(String omNodeID) throws IOException {
OzoneManager ozoneManager = ozoneManagerMap.get(omNodeID);
if (!inactiveOMs.contains(ozoneManager)) {
throw new IOException("OM is already active.");
} else {
ozoneManager.start();
activeOMs.add(ozoneManager);
inactiveOMs.remove(ozoneManager);
}
}

@Override
public void restartOzoneManager() throws IOException {
for (OzoneManager ozoneManager : ozoneManagers) {
Expand Down Expand Up @@ -125,6 +151,8 @@ public void stopOzoneManager(String omNodeId) {
public static class Builder extends MiniOzoneClusterImpl.Builder {

private final String nodeIdBaseStr = "omNode-";
private List<OzoneManager> activeOMs = new ArrayList<>();
private List<OzoneManager> inactiveOMs = new ArrayList<>();

/**
* Creates a new Builder.
Expand All @@ -137,6 +165,10 @@ public Builder(OzoneConfiguration conf) {

@Override
public MiniOzoneCluster build() throws IOException {
if (numOfActiveOMs > numOfOMs) {
throw new IllegalArgumentException("Number of active OMs cannot be " +
"more than the total number of OMs");
}
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm;
Expand All @@ -150,8 +182,8 @@ public MiniOzoneCluster build() throws IOException {
}

final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
scm, hddsDatanodes);
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(
conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
}
Expand Down Expand Up @@ -215,9 +247,16 @@ private Map<String, OzoneManager> createOMService() throws IOException,
om.setCertClient(certClient);
omMap.put(nodeId, om);

om.start();
LOG.info("Started OzoneManager RPC server at " +
om.getOmRpcServerAddr());
if (i <= numOfActiveOMs) {
om.start();
activeOMs.add(om);
LOG.info("Started OzoneManager RPC server at " +
om.getOmRpcServerAddr());
} else {
inactiveOMs.add(om);
LOG.info("Intialized OzoneManager at " + om.getOmRpcServerAddr()
+ ". This OM is currently inactive (not running).");
}
}

// Set default OM address to point to the first OM. Clients would
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.utils.db.DBCheckpoint;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.apache.hadoop.ozone.om.TestOzoneManagerHA.createKey;

/**
* Tests the Ratis snaphsots feature in OM.
*/
public class TestOMRatisSnapshots {

private MiniOzoneHAClusterImpl cluster = null;
private ObjectStore objectStore;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
private int numOfOMs = 3;
private static final long SNAPSHOT_THRESHOLD = 50;
private static final int LOG_PURGE_GAP = 50;

@Rule
public ExpectedException exception = ExpectedException.none();

@Rule
public Timeout timeout = new Timeout(500_000);

/**
* Create a MiniOzoneCluster for testing. The cluster initially has one
* inactive OM. So at the start of the cluster, there will be 2 active and 1
* inactive OM.
*
* @throws IOException
*/
@Before
public void init() throws Exception {
conf = new OzoneConfiguration();
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId("om-service-test1")
.setNumOfOzoneManagers(numOfOMs)
.setNumOfActiveOMs(2)
.build();
cluster.waitForClusterToBeReady();
objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
}

/**
* Shutdown MiniDFSCluster.
*/
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}

@Test
public void testInstallSnapshot() throws Exception {
// Get the leader OM
String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
.getCurrentProxyOMNodeId();
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();

// Find the inactive OM
String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
if (cluster.isOMActive(followerNodeId)) {
followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId();
}
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);

// Do some transactions so that the log index increases
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();

objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);

retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);

long leaderOMappliedLogIndex =
leaderRatisServer.getStateMachineLastAppliedIndex();
leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex();

List<String> keys = new ArrayList<>();
while (leaderOMappliedLogIndex < 2000) {
keys.add(createKey(ozoneBucket));
leaderOMappliedLogIndex =
leaderRatisServer.getStateMachineLastAppliedIndex();
}

// Get the latest db checkpoint from the leader OM.
long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true);
DBCheckpoint leaderDbCheckpoint =
leaderOM.getMetadataManager().getStore().getCheckpoint(false);

// Start the inactive OM
cluster.startInactiveOM(followerNodeId);

// The recently started OM should be lagging behind the leader OM.
long followerOMLastAppliedIndex =
followerOM.getOmRatisServer().getStateMachineLastAppliedIndex();
Assert.assertTrue(
followerOMLastAppliedIndex < leaderOMSnaphsotIndex);

// Install leader OM's db checkpoint on the lagging OM.
followerOM.getOmRatisServer().getOmStateMachine().pause();
followerOM.getMetadataManager().getStore().close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to understand, as according to your tests with log purge gap 50 and snapshot interval 50. Old logs will be purged. So, when inactive OM start's ratis will internally makes the notifyInstallSnapshot and do reload automatically right. Why in this test it is being done manually, is this just to test the steps. If so, can we have the test automatically to do so by ratis. (I mean have some wait, until it is done, and then check DB, not sure for IT this is too much)

Copy link
Contributor Author

@hanishakoneru hanishakoneru Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this was for the test only. The problem with testing end to end is that HttpGet does not work for unit tests. The CheckpointServlet (from Recon) which we use for downloading the checkpoint from leader uses HttpGet for the checkpoint transfer.

followerOM.replaceOMDBWithCheckpoint(
leaderOMSnaphsotIndex, leaderDbCheckpoint.getCheckpointLocation());

// Reload the follower OM with new DB checkpoint from the leader OM.
followerOM.reloadOMState(leaderOMSnaphsotIndex);
followerOM.getOmRatisServer().getOmStateMachine().unpause(
leaderOMSnaphsotIndex);

// After the new checkpoint is loaded and state machine is unpaused, the
// follower OM lastAppliedIndex must match the snapshot index of the
// checkpoint.
followerOMLastAppliedIndex = followerOM.getOmRatisServer()
.getStateMachineLastAppliedIndex();
Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex);

// Verify that the follower OM's DB contains the transactions which were
// made while it was inactive.
OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
followerOMMetaMngr.getVolumeKey(volumeName)));
Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
for (String key : keys) {
Assert.assertNotNull(followerOMMetaMngr.getKeyTable().get(
followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
}
}
}
Loading