createOMService() throws IOException,
om.setCertClient(certClient);
omMap.put(nodeId, om);
- om.start();
- LOG.info("Started OzoneManager RPC server at " +
- om.getOmRpcServerAddr());
+ if (i <= numOfActiveOMs) {
+ om.start();
+ activeOMs.add(om);
+ LOG.info("Started OzoneManager RPC server at " +
+ om.getOmRpcServerAddr());
+ } else {
+ inactiveOMs.add(om);
+ LOG.info("Intialized OzoneManager at " + om.getOmRpcServerAddr()
+ + ". This OM is currently inactive (not running).");
+ }
}
// Set default OM address to point to the first OM. Clients would
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
new file mode 100644
index 0000000000000..6ac28c3de6eb3
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.om.TestOzoneManagerHA.createKey;
+
+/**
+ * Tests the Ratis snaphsots feature in OM.
+ */
+public class TestOMRatisSnapshots {
+
+ private MiniOzoneHAClusterImpl cluster = null;
+ private ObjectStore objectStore;
+ private OzoneConfiguration conf;
+ private String clusterId;
+ private String scmId;
+ private int numOfOMs = 3;
+ private static final long SNAPSHOT_THRESHOLD = 50;
+ private static final int LOG_PURGE_GAP = 50;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Rule
+ public Timeout timeout = new Timeout(500_000);
+
+ /**
+ * Create a MiniOzoneCluster for testing. The cluster initially has one
+ * inactive OM. So at the start of the cluster, there will be 2 active and 1
+ * inactive OM.
+ *
+ * @throws IOException
+ */
+ @Before
+ public void init() throws Exception {
+ conf = new OzoneConfiguration();
+ clusterId = UUID.randomUUID().toString();
+ scmId = UUID.randomUUID().toString();
+ conf.setLong(
+ OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+ SNAPSHOT_THRESHOLD);
+ conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
+ cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+ .setClusterId(clusterId)
+ .setScmId(scmId)
+ .setOMServiceId("om-service-test1")
+ .setNumOfOzoneManagers(numOfOMs)
+ .setNumOfActiveOMs(2)
+ .build();
+ cluster.waitForClusterToBeReady();
+ objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testInstallSnapshot() throws Exception {
+ // Get the leader OM
+ String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
+ .getCurrentProxyOMNodeId();
+ OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+ OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+ // Find the inactive OM
+ String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
+ if (cluster.isOMActive(followerNodeId)) {
+ followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId();
+ }
+ OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+ // Do some transactions so that the log index increases
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ objectStore.createVolume(volumeName, createVolumeArgs);
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ retVolumeinfo.createBucket(bucketName);
+ OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+ long leaderOMappliedLogIndex =
+ leaderRatisServer.getStateMachineLastAppliedIndex();
+ leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex();
+
+ List keys = new ArrayList<>();
+ while (leaderOMappliedLogIndex < 2000) {
+ keys.add(createKey(ozoneBucket));
+ leaderOMappliedLogIndex =
+ leaderRatisServer.getStateMachineLastAppliedIndex();
+ }
+
+ // Get the latest db checkpoint from the leader OM.
+ long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true);
+ DBCheckpoint leaderDbCheckpoint =
+ leaderOM.getMetadataManager().getStore().getCheckpoint(false);
+
+ // Start the inactive OM
+ cluster.startInactiveOM(followerNodeId);
+
+ // The recently started OM should be lagging behind the leader OM.
+ long followerOMLastAppliedIndex =
+ followerOM.getOmRatisServer().getStateMachineLastAppliedIndex();
+ Assert.assertTrue(
+ followerOMLastAppliedIndex < leaderOMSnaphsotIndex);
+
+ // Install leader OM's db checkpoint on the lagging OM.
+ followerOM.getOmRatisServer().getOmStateMachine().pause();
+ followerOM.getMetadataManager().getStore().close();
+ followerOM.replaceOMDBWithCheckpoint(
+ leaderOMSnaphsotIndex, leaderDbCheckpoint.getCheckpointLocation());
+
+ // Reload the follower OM with new DB checkpoint from the leader OM.
+ followerOM.reloadOMState(leaderOMSnaphsotIndex);
+ followerOM.getOmRatisServer().getOmStateMachine().unpause(
+ leaderOMSnaphsotIndex);
+
+ // After the new checkpoint is loaded and state machine is unpaused, the
+ // follower OM lastAppliedIndex must match the snapshot index of the
+ // checkpoint.
+ followerOMLastAppliedIndex = followerOM.getOmRatisServer()
+ .getStateMachineLastAppliedIndex();
+ Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex);
+
+ // Verify that the follower OM's DB contains the transactions which were
+ // made while it was inactive.
+ OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+ Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+ followerOMMetaMngr.getVolumeKey(volumeName)));
+ Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
+ followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+ for (String key : keys) {
+ Assert.assertNotNull(followerOMMetaMngr.getKeyTable().get(
+ followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 05c53b313dfbd..92fc263cf957d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -829,7 +829,11 @@ public void testOMRatisSnapshot() throws Exception {
}
- private void createKey(OzoneBucket ozoneBucket) throws IOException {
+ /**
+ * Create a key in the bucket.
+ * @return the key name.
+ */
+ static String createKey(OzoneBucket ozoneBucket) throws IOException {
String keyName = "key" + RandomStringUtils.randomNumeric(5);
String data = "data" + RandomStringUtils.randomNumeric(5);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
@@ -837,5 +841,6 @@ private void createKey(OzoneBucket ozoneBucket) throws IOException {
ReplicationFactor.ONE, new HashMap<>());
ozoneOutputStream.write(data.getBytes(), 0, data.length());
ozoneOutputStream.close();
+ return keyName;
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index d54e1216a8693..b36a12880c279 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -126,7 +126,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) {
// ratis snapshot first. This step also included flushing the OM DB.
// Hence, we can set flush to false.
flush = false;
- ratisSnapshotIndex = om.saveRatisSnapshot();
+ ratisSnapshotIndex = om.saveRatisSnapshot(true);
} else {
ratisSnapshotIndex = om.loadRatisSnapshotIndex();
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 46fdabd4bc9ca..dbadf685ed88b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -196,15 +196,18 @@ public void decNumKeys() {
}
public void setNumVolumes(long val) {
- this.numVolumes.incr(val);
+ long oldVal = this.numVolumes.value();
+ this.numVolumes.incr(val - oldVal);
}
public void setNumBuckets(long val) {
- this.numBuckets.incr(val);
+ long oldVal = this.numBuckets.value();
+ this.numBuckets.incr(val - oldVal);
}
public void setNumKeys(long val) {
- this.numKeys.incr(val);
+ long oldVal = this.numKeys.value();
+ this.numKeys.incr(val- oldVal);
}
public long getNumVolumes() {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 4312516329d6c..0267350a9e18c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -25,6 +25,7 @@
import com.google.protobuf.BlockingService;
import java.net.InetAddress;
+import java.nio.file.Path;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.KeyPair;
@@ -143,6 +144,10 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.utils.RetriableTask;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.bouncycastle.pkcs.PKCS10CertificationRequest;
import org.slf4j.Logger;
@@ -236,18 +241,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
- private final OMMetadataManager metadataManager;
- private final VolumeManager volumeManager;
- private final BucketManager bucketManager;
- private final KeyManager keyManager;
- private final PrefixManagerImpl prefixManager;
+
+ private OMMetadataManager metadataManager;
+ private VolumeManager volumeManager;
+ private BucketManager bucketManager;
+ private KeyManager keyManager;
+ private PrefixManagerImpl prefixManager;
+ private S3BucketManager s3BucketManager;
+
private final OMMetrics metrics;
private OzoneManagerHttpServer httpServer;
private final OMStorage omStorage;
private final ScmBlockLocationProtocol scmBlockClient;
private final StorageContainerLocationProtocol scmContainerClient;
private ObjectName omInfoBeanName;
- private final S3BucketManager s3BucketManager;
private Timer metricsTimer;
private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
private static final ObjectWriter WRITER =
@@ -258,7 +265,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final Runnable shutdownHook;
private final File omMetaDir;
private final boolean isAclEnabled;
- private final IAccessAuthorizer accessAuthorizer;
+ private IAccessAuthorizer accessAuthorizer;
private JvmPauseMonitor jvmPauseMonitor;
private final SecurityConfig secConfig;
private S3SecretManager s3SecretManager;
@@ -308,12 +315,37 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
throw new OMException("OM not initialized.",
ResultCodes.OM_NOT_INITIALIZED);
}
+
+ // Read configuration and set values.
+ ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
+ omMetaDir = OmUtils.getOmDbDir(configuration);
+ this.isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
+ OZONE_ACL_ENABLED_DEFAULT);
+ this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
+ OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
+ this.preallocateBlocksMax = conf.getInt(
+ OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
+ OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
+ this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
+ HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
+ this.useRatisForReplication = conf.getBoolean(
+ DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+ // TODO: This is a temporary check. Once fully implemented, all OM state
+ // change should go through Ratis - be it standalone (for non-HA) or
+ // replicated (for HA).
+ isRatisEnabled = configuration.getBoolean(
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+
// Load HA related configurations
loadOMHAConfigs(configuration);
+ InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
+ omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
scmContainerClient = getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
scmBlockClient = getScmBlockClient(configuration);
+ this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
// For testing purpose only, not hit scm from om as Hadoop UGI can't login
// two principals in the same JVM.
@@ -329,16 +361,32 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- metadataManager = new OmMetadataManagerImpl(configuration);
+ secConfig = new SecurityConfig(configuration);
+ // Create the KMS Key Provider
+ try {
+ kmsProvider = createKeyProviderExt(configuration);
+ } catch (IOException ioe) {
+ kmsProvider = null;
+ LOG.error("Fail to create Key Provider");
+ }
+ if (secConfig.isSecurityEnabled()) {
+ omComponent = OM_DAEMON + "-" + omId;
+ if(omStorage.getOmCertSerialId() == null) {
+ throw new RuntimeException("OzoneManager started in secure mode but " +
+ "doesn't have SCM signed certificate.");
+ }
+ certClient = new OMCertificateClient(new SecurityConfig(conf),
+ omStorage.getOmCertSerialId());
+ }
+ if (secConfig.isBlockTokenEnabled()) {
+ blockTokenMgr = createBlockTokenSecretManager(configuration);
+ }
+
+ instantiateServices();
+
+ initializeRatisServer();
+ initializeRatisClient();
- // This is a temporary check. Once fully implemented, all OM state change
- // should go through Ratis - be it standalone (for non-HA) or replicated
- // (for HA).
- isRatisEnabled = configuration.getBoolean(
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
- startRatisServer();
- startRatisClient();
if (isRatisEnabled) {
// Create Ratis storage dir
String omRatisDirectory = OmUtils.getOMRatisDirectory(configuration);
@@ -361,59 +409,44 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
OM_RATIS_SNAPSHOT_INDEX);
this.snapshotIndex = loadRatisSnapshotIndex();
- InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
- omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
- secConfig = new SecurityConfig(configuration);
- volumeManager = new VolumeManagerImpl(metadataManager, configuration);
+ metrics = OMMetrics.create();
- // Create the KMS Key Provider
- try {
- kmsProvider = createKeyProviderExt(configuration);
- } catch (IOException ioe) {
- kmsProvider = null;
- LOG.error("Fail to create Key Provider");
- }
+ // Start Om Rpc Server.
+ omRpcServer = getRpcServer(conf);
+ omRpcAddress = updateRPCListenAddress(configuration,
+ OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+
+ shutdownHook = () -> {
+ saveOmMetrics();
+ };
+ ShutdownHookManager.get().addShutdownHook(shutdownHook,
+ SHUTDOWN_HOOK_PRIORITY);
+ }
+
+ /**
+ * Instantiate services which are dependent on the OM DB state.
+ * When OM state is reloaded, these services are re-initialized with the
+ * new OM state.
+ */
+ private void instantiateServices() throws IOException {
+ metadataManager = new OmMetadataManagerImpl(configuration);
+ volumeManager = new VolumeManagerImpl(metadataManager, configuration);
bucketManager = new BucketManagerImpl(metadataManager, getKmsProvider(),
isRatisEnabled);
- metrics = OMMetrics.create();
-
s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
volumeManager, bucketManager);
if (secConfig.isSecurityEnabled()) {
- omComponent = OM_DAEMON + "-" + omId;
- if(omStorage.getOmCertSerialId() == null) {
- throw new RuntimeException("OzoneManager started in secure mode but " +
- "doesn't have SCM signed certificate.");
- }
- certClient = new OMCertificateClient(new SecurityConfig(conf),
- omStorage.getOmCertSerialId());
s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager);
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
- if (secConfig.isBlockTokenEnabled()) {
- blockTokenMgr = createBlockTokenSecretManager(configuration);
- }
-
- omRpcServer = getRpcServer(conf);
- omRpcAddress = updateRPCListenAddress(configuration,
- OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
-
- this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
prefixManager = new PrefixManagerImpl(metadataManager);
keyManager = new KeyManagerImpl(this, scmClient, configuration,
omStorage.getOmId());
- shutdownHook = () -> {
- saveOmMetrics();
- };
- ShutdownHookManager.get().addShutdownHook(shutdownHook,
- SHUTDOWN_HOOK_PRIORITY);
- isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
- OZONE_ACL_ENABLED_DEFAULT);
if (isAclEnabled) {
- accessAuthorizer = getACLAuthorizerInstance(conf);
+ accessAuthorizer = getACLAuthorizerInstance(configuration);
if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
OzoneNativeAuthorizer authorizer =
(OzoneNativeAuthorizer) accessAuthorizer;
@@ -425,17 +458,6 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
} else {
accessAuthorizer = null;
}
- ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
- omMetaDir = OmUtils.getOmDbDir(configuration);
- this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
- OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
- this.preallocateBlocksMax = conf.getInt(
- OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
- OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
- this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
- HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
- this.useRatisForReplication = conf.getBoolean(
- DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
}
/**
@@ -1235,6 +1257,14 @@ public void start() throws IOException {
DefaultMetricsSystem.initialize("OzoneManager");
+ // Start Ratis services
+ if (omRatisServer != null) {
+ omRatisServer.start();
+ }
+ if (omRatisClient != null) {
+ omRatisClient.connect();
+ }
+
metadataManager.start(configuration);
startSecretManagerIfNecessary();
@@ -1305,8 +1335,14 @@ public void restart() throws IOException {
omRpcServer.start();
isOmRpcServerRunning = true;
- startRatisServer();
- startRatisClient();
+ initializeRatisServer();
+ if (omRatisServer != null) {
+ omRatisServer.start();
+ }
+ initializeRatisClient();
+ if (omRatisClient != null) {
+ omRatisClient.connect();
+ }
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
@@ -1353,15 +1389,13 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
/**
* Creates an instance of ratis server.
*/
- private void startRatisServer() throws IOException {
+ private void initializeRatisServer() throws IOException {
if (isRatisEnabled) {
if (omRatisServer == null) {
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
configuration, this, omNodeDetails, peerNodes);
}
- omRatisServer.start();
-
- LOG.info("OzoneManager Ratis server started at port {}",
+ LOG.info("OzoneManager Ratis server initialized at port {}",
omRatisServer.getServerPort());
} else {
omRatisServer = null;
@@ -1371,14 +1405,13 @@ private void startRatisServer() throws IOException {
/**
* Creates an instance of ratis client.
*/
- private void startRatisClient() throws IOException {
+ private void initializeRatisClient() throws IOException {
if (isRatisEnabled) {
if (omRatisClient == null) {
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(),
configuration);
}
- omRatisClient.connect();
} else {
omRatisClient = null;
}
@@ -1398,11 +1431,13 @@ public long loadRatisSnapshotIndex() {
}
@Override
- public long saveRatisSnapshot() throws IOException {
+ public long saveRatisSnapshot(boolean flush) throws IOException {
snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
- // Flush the OM state to disk
- getMetadataManager().getStore().flush();
+ if (flush) {
+ // Flush the OM state to disk
+ metadataManager.getStore().flush();
+ }
PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
@@ -2697,7 +2732,6 @@ public List listS3Buckets(String userName, String startKey,
}
}
}
-
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException {
@@ -3069,6 +3103,179 @@ public List getAcl(OzoneObj obj) throws IOException {
}
}
+ /**
+ * Download and install latest checkpoint from leader OM.
+ * If the download checkpoints snapshot index is greater than this OM's
+ * last applied transaction index, then re-initialize the OM state via this
+ * checkpoint. Before re-initializing OM state, the OM Ratis server should
+ * be stopped so that no new transactions can be applied.
+ * @param leaderId peerNodeID of the leader OM
+ * @return If checkpoint is installed, return the corresponding termIndex.
+ * Otherwise, return null.
+ */
+ public TermIndex installSnapshot(String leaderId) {
+ if (omSnapshotProvider == null) {
+ LOG.error("OM Snapshot Provider is not configured as there are no peer " +
+ "nodes.");
+ return null;
+ }
+
+ DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
+ Path newDBlocation = omDBcheckpoint.getCheckpointLocation();
+
+ // Check if current ratis log index is smaller than the downloaded
+ // snapshot index. If yes, proceed by stopping the ratis server so that
+ // the OM state can be re-initialized. If no, then do not proceed with
+ // installSnapshot.
+ long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex();
+ long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
+ if (checkpointSnapshotIndex <= lastAppliedIndex) {
+ LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
+ "applied index: {} is greater than or equal to the checkpoint's " +
+ "snapshot index: {}. Deleting the downloaded checkpoint {}", leaderId,
+ lastAppliedIndex, checkpointSnapshotIndex,
+ newDBlocation);
+ try {
+ FileUtils.deleteFully(newDBlocation);
+ } catch (IOException e) {
+ LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
+ "from OM leader {}.", newDBlocation,
+ leaderId, e);
+ }
+ return null;
+ }
+
+ // Pause the State Machine so that no new transactions can be applied.
+ // This action also clears the OM Double Buffer so that if there are any
+ // pending transactions in the buffer, they are discarded.
+ // TODO: The Ratis server should also be paused here. This is required
+ // because a leader election might happen while the snapshot
+ // installation is in progress and the new leader might start sending
+ // append log entries to the ratis server.
+ omRatisServer.getOmStateMachine().pause();
+
+ File dbBackup;
+ try {
+ dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, newDBlocation);
+ } catch (Exception e) {
+ LOG.error("OM DB checkpoint replacement with new downloaded checkpoint " +
+ "failed.", e);
+ return null;
+ }
+
+ // Reload the OM DB store with the new checkpoint.
+ // Restart (unpause) the state machine and update its last applied index
+ // to the installed checkpoint's snapshot index.
+ try {
+ reloadOMState(checkpointSnapshotIndex);
+ omRatisServer.getOmStateMachine().unpause(checkpointSnapshotIndex);
+ } catch (IOException e) {
+ LOG.error("Failed to reload OM state with new DB checkpoint.", e);
+ return null;
+ }
+
+ // Delete the backup DB
+ try {
+ FileUtils.deleteFully(dbBackup);
+ } catch (IOException e) {
+ LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
+ }
+
+ // TODO: We should only return the snpashotIndex to the leader.
+ // Should be fixed after RATIS-586
+ TermIndex newTermIndex = TermIndex.newTermIndex(0,
+ checkpointSnapshotIndex);
+
+ return newTermIndex;
+ }
+
+ /**
+ * Download the latest OM DB checkpoint from the leader OM.
+ * @param leaderId OMNodeID of the leader OM node.
+ * @return latest DB checkpoint from leader OM.
+ */
+ private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
+ LOG.info("Downloading checkpoint from leader OM {} and reloading state " +
+ "from the checkpoint.", leaderId);
+
+ try {
+ return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
+ } catch (IOException e) {
+ LOG.error("Failed to download checkpoint from OM leader {}", leaderId, e);
+ }
+ return null;
+ }
+
+ /**
+ * Replace the current OM DB with the new DB checkpoint.
+ * @param lastAppliedIndex the last applied index in the current OM DB.
+ * @param checkpointPath path to the new DB checkpoint
+ * @return location of the backup of the original DB
+ * @throws Exception
+ */
+ File replaceOMDBWithCheckpoint(long lastAppliedIndex, Path checkpointPath)
+ throws Exception {
+ // Stop the DB first
+ DBStore store = metadataManager.getStore();
+ store.close();
+
+ // Take a backup of the current DB
+ File db = store.getDbLocation();
+ String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
+ lastAppliedIndex + "_" + System.currentTimeMillis();
+ File dbBackup = new File(db.getParentFile(), dbBackupName);
+
+ try {
+ Files.move(db.toPath(), dbBackup.toPath());
+ } catch (IOException e) {
+ LOG.error("Failed to create a backup of the current DB. Aborting " +
+ "snapshot installation.");
+ throw e;
+ }
+
+ // Move the new DB checkpoint into the om metadata dir
+ try {
+ Files.move(checkpointPath, db.toPath());
+ } catch (IOException e) {
+ LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
+ "directory {}. Resetting to original DB.", checkpointPath,
+ db.toPath());
+ Files.move(dbBackup.toPath(), db.toPath());
+ throw e;
+ }
+ return dbBackup;
+ }
+
+ /**
+ * Re-instantiate MetadataManager with new DB checkpoint.
+ * All the classes which use/ store MetadataManager should also be updated
+ * with the new MetadataManager instance.
+ */
+ void reloadOMState(long newSnapshotIndex) throws IOException {
+
+ instantiateServices();
+
+ // Restart required services
+ metadataManager.start(configuration);
+ keyManager.start(configuration);
+
+ // Set metrics and start metrics back ground thread
+ metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
+ .getVolumeTable()));
+ metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager
+ .getBucketTable()));
+
+ // Delete the omMetrics file if it exists and save the a new metrics file
+ // with new data
+ Files.deleteIfExists(getMetricsStorageFile().toPath());
+ saveOmMetrics();
+
+ // Update OM snapshot index with the new snapshot index (from the new OM
+ // DB state) and save the snapshot index to disk
+ this.snapshotIndex = newSnapshotIndex;
+ saveRatisSnapshot(false);
+ }
+
public static Logger getLogger() {
return LOG;
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 49a84da9f3abb..1e51273c12302 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -308,10 +308,15 @@ public RaftGroup getRaftGroup() {
}
/**
- * Returns OzoneManager StateMachine.
+ * Initializes and returns OzoneManager StateMachine.
*/
private OzoneManagerStateMachine getStateMachine() {
- return new OzoneManagerStateMachine(this);
+ return new OzoneManagerStateMachine(this);
+ }
+
+ @VisibleForTesting
+ public OzoneManagerStateMachine getOmStateMachine() {
+ return omStateMachine;
}
public OzoneManager getOzoneManager() {
@@ -387,6 +392,12 @@ private RaftProperties newRaftProperties(Configuration conf) {
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+ false);
+ final int logPurgeGap = conf.getInt(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
+ OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT);
+ RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
// For grpc set the maximum message size
// TODO: calculate the optimal max message size
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 31c467d918716..c51323e3a7d6e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
@@ -43,12 +44,15 @@
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,8 +72,9 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
private OzoneManagerHARequestHandler handler;
private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
- private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
+ private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final ExecutorService executorService;
+ private final ExecutorService installSnapshotExecutor;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
@@ -82,19 +87,20 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
+ this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
}
/**
* Initializes the State Machine with the given server, group and storage.
- * TODO: Load the latest snapshot from the file system.
*/
@Override
- public void initialize(
- RaftServer server, RaftGroupId id, RaftStorage raftStorage)
- throws IOException {
- super.initialize(server, id, raftStorage);
- this.raftGroupId = id;
- storage.init(raftStorage);
+ public void initialize(RaftServer server, RaftGroupId id,
+ RaftStorage raftStorage) throws IOException {
+ lifeCycle.startAndTransition(() -> {
+ super.initialize(server, id, raftStorage);
+ this.raftGroupId = id;
+ storage.init(raftStorage);
+ });
}
/**
@@ -185,6 +191,27 @@ public CompletableFuture query(Message request) {
}
}
+ @Override
+ public void pause() {
+ lifeCycle.transition(LifeCycle.State.PAUSING);
+ lifeCycle.transition(LifeCycle.State.PAUSED);
+ ozoneManagerDoubleBuffer.stop();
+ }
+
+ /**
+ * Unpause the StateMachine, re-initialize the DoubleBuffer and update the
+ * lastAppliedIndex. This should be done after uploading new state to the
+ * StateMachine.
+ */
+ public void unpause(long newLastAppliedSnaphsotIndex) {
+ lifeCycle.startAndTransition(() -> {
+ this.ozoneManagerDoubleBuffer =
+ new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
+ this::updateLastAppliedIndex);
+ this.updateLastAppliedIndex(newLastAppliedSnaphsotIndex);
+ });
+ }
+
/**
* Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
* is the log index corresponding to the last applied transaction on the OM
@@ -197,11 +224,44 @@ public CompletableFuture query(Message request) {
public long takeSnapshot() throws IOException {
LOG.info("Saving Ratis snapshot on the OM.");
if (ozoneManager != null) {
- return ozoneManager.saveRatisSnapshot();
+ return ozoneManager.saveRatisSnapshot(true);
}
return 0;
}
+ /**
+ * Leader OM has purged entries from its log. To catch up, OM must download
+ * the latest checkpoint from the leader OM and install it.
+ * @param roleInfoProto the leader node information
+ * @param firstTermIndexInLog TermIndex of the first append entry available
+ * in the Leader's log.
+ * @return the last term index included in the installed snapshot.
+ */
+ @Override
+ public CompletableFuture notifyInstallSnapshotFromLeader(
+ RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+
+ String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId())
+ .toString();
+
+ LOG.info("Received install snapshot notificaiton form OM leader: {} with " +
+ "term index: {}", leaderNodeId, firstTermIndexInLog);
+
+ if (!roleInfoProto.getRole().equals(RaftProtos.RaftPeerRole.LEADER)) {
+ // A non-leader Ratis server should not send this notification.
+ LOG.error("Received Install Snapshot notification from non-leader OM " +
+ "node: {}. Ignoring the notification.", leaderNodeId);
+ return completeExceptionally(new OMException("Received notification to " +
+ "install snaphost from non-leader OM node",
+ OMException.ResultCodes.RATIS_ERROR));
+ }
+
+ CompletableFuture future = CompletableFuture.supplyAsync(
+ () -> ozoneManager.installSnapshot(leaderNodeId),
+ installSnapshotExecutor);
+ return future;
+ }
+
/**
* Notifies the state machine that the raft peer is no longer leader.
*/
@@ -276,10 +336,9 @@ public void setRaftGroupId(RaftGroupId raftGroupId) {
this.raftGroupId = raftGroupId;
}
-
public void stop() {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+ HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
}
-
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
index e1d488923a0ec..87446db3de05f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
@@ -149,7 +149,7 @@ private void closeHttpClient() throws IOException {
* @param leaderOMNodeID leader OM Node ID.
* @return the DB checkpoint (including the ratis snapshot index)
*/
- protected DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
+ public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
throws IOException {
String snapshotFileName = OM_SNAPSHOT_DB + "_" + System.currentTimeMillis();
File targetFile = new File(omSnapshotDir, snapshotFileName + ".tar.gz");