Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.hadoop.hdds;

import org.apache.hadoop.utils.db.DBProfile;
import org.apache.ratis.util.TimeDuration;

import java.util.concurrent.TimeUnit;

/**
* This class contains constants for configuration keys and default values
Expand Down Expand Up @@ -70,6 +73,14 @@ public final class HddsConfigKeys {
"hdds.scm.chillmode.min.datanode";
public static final int HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT = 1;


public static final String
HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT =
"hdds.scm.wait.time.after.chillmode.exit";

public static final String
HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT = "5m";

public static final String HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK =
"hdds.scm.chillmode.pipeline-availability.check";
public static final boolean
Expand Down
10 changes: 10 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,16 @@
</description>
</property>

<property>
<name>hdds.scm.wait.time.after.chillmode.exit</name>
<value>5m</value>
<tag>HDDS,SCM,OPERATION</tag>
<description> After exiting chillmode, wait for configured interval of
time to start replication monitor and cleanup activities of unhealthy
pipelines.
</description>
</property>

<property>
<name>hdds.scm.chillmode.enabled</name>
<value>true</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,11 @@ AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type,
* @return the block deleting service executed in SCM.
*/
SCMBlockDeletingService getSCMBlockDeletingService();

/**
* Set ChillMode status.
*
* @param chillModeStatus
*/
void setChillModeStatus(boolean chillModeStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@


/** Block Manager manages the block access for SCM. */
public class BlockManagerImpl implements EventHandler<Boolean>,
BlockManager, BlockmanagerMXBean {
public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private static final Logger LOG =
LoggerFactory.getLogger(BlockManagerImpl.class);
// TODO : FIX ME : Hard coding the owner.
Expand Down Expand Up @@ -337,8 +336,8 @@ public SCMBlockDeletingService getSCMBlockDeletingService() {
}

@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
this.chillModePrecheck.setInChillMode(inChillMode);
public void setChillModeStatus(boolean chillModeStatus) {
this.chillModePrecheck.setInChillMode(chillModeStatus);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.hdds.scm.chillmode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.replication.
ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Class to handle the activities needed to be performed after exiting chill
* mode.
*/
public class ChillModeHandler implements EventHandler<ChillModeStatus> {

private final SCMClientProtocolServer scmClientProtocolServer;
private final BlockManager scmBlockManager;
private final long waitTime;
private final AtomicBoolean isInChillMode = new AtomicBoolean(true);
private final ReplicationActivityStatus replicationActivityStatus;


/**
* ChillModeHandler, to handle the logic once we exit chill mode.
* @param configuration
* @param clientProtocolServer
* @param blockManager
* @param replicationStatus
*/
public ChillModeHandler(Configuration configuration,
SCMClientProtocolServer clientProtocolServer,
BlockManager blockManager,
ReplicationActivityStatus replicationStatus) {
Objects.requireNonNull(configuration, "Configuration cannot be null");
Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " +
"object cannot be null");
Objects.requireNonNull(blockManager, "BlockManager object cannot be null");
Objects.requireNonNull(replicationStatus, "ReplicationActivityStatus " +
"object cannot be null");
this.waitTime = configuration.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
scmClientProtocolServer = clientProtocolServer;
scmBlockManager = blockManager;
replicationActivityStatus = replicationStatus;

boolean chillModeEnabled = configuration.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
isInChillMode.set(chillModeEnabled);

}

/**
* Set ChillMode status based on
* {@link org.apache.hadoop.hdds.scm.events.SCMEvents#CHILL_MODE_STATUS}.
*
* Inform BlockManager, ScmClientProtocolServer and replicationAcitivity
* status about chillMode status.
*
* @param chillModeStatus
* @param publisher
*/
@Override
public void onMessage(ChillModeStatus chillModeStatus,
EventPublisher publisher) {
isInChillMode.set(chillModeStatus.getChillModeStatus());

replicationActivityStatus.fireReplicationStart(isInChillMode.get(),
waitTime);
scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
scmBlockManager.setChillModeStatus(isInChillMode.get());

}

public boolean getChillModeStatus() {
return isInChillMode.get();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public SCMChillModeManager(Configuration conf,
this.isChillModeEnabled = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);

if (isChillModeEnabled) {
ContainerChillModeRule containerChillModeRule =
new ContainerChillModeRule(config, allContainers, this);
Expand Down Expand Up @@ -111,7 +112,8 @@ public SCMChillModeManager(Configuration conf,
*/
@VisibleForTesting
public void emitChillModeStatus() {
eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS, getInChillMode());
eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS,
new ChillModeStatus(getInChillMode()));
}

public void validateChillModeExitRules(EventPublisher eventQueue) {
Expand Down Expand Up @@ -185,4 +187,20 @@ public OneReplicaPipelineChillModeRule getOneReplicaPipelineChillModeRule() {
exitRules.get(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE);
}


/**
* Class used during ChillMode status event.
*/
public static class ChillModeStatus {

private boolean chillModeStatus;
public ChillModeStatus(boolean chillModeState) {
this.chillModeStatus = chillModeState;
}

public boolean getChillModeStatus() {
return chillModeStatus;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import javax.management.ObjectName;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;

import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.metrics2.util.MBeans;


import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,15 +41,8 @@ public class ReplicationActivityStatus implements
LoggerFactory.getLogger(ReplicationActivityStatus.class);

private AtomicBoolean replicationEnabled = new AtomicBoolean();
private AtomicBoolean replicationStatusSetExternally = new AtomicBoolean();
private ObjectName jmxObjectName;
private ReplicationStatusListener replicationStatusListener;
private ChillModeStatusListener chillModeStatusListener;

public ReplicationActivityStatus(){
replicationStatusListener = new ReplicationStatusListener();
chillModeStatusListener = new ChillModeStatusListener();
}
@Override
public boolean isReplicationEnabled() {
return replicationEnabled.get();
Expand Down Expand Up @@ -84,35 +79,26 @@ public void close() throws IOException {
}

/**
* Replication status listener.
*/
class ReplicationStatusListener implements EventHandler<Boolean> {
@Override
public void onMessage(Boolean status, EventPublisher publisher) {
replicationStatusSetExternally.set(true);
replicationEnabled.set(status);
}
}

/**
* Replication status is influenced by Chill mode status as well.
* Waits for
* {@link HddsConfigKeys#HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT} and set
* replicationEnabled to start replication monitor thread.
*/
class ChillModeStatusListener implements EventHandler<Boolean> {

@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
if (!replicationStatusSetExternally.get()) {
replicationEnabled.set(!inChillMode);
}
public void fireReplicationStart(boolean chillModeStatus,
long waitTime) {
if (!chillModeStatus) {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(waitTime);
} catch (InterruptedException ex) {
LOG.error("Interrupted during wait, replication event is not fired",
ex);
}
setReplicationEnabled(true);
LOG.info("Replication Timer sleep for {} ms completed. Enable " +
"Replication", waitTime);
});
}
}

public ReplicationStatusListener getReplicationStatusListener() {
return replicationStatusListener;
}

public ChillModeStatusListener getChillModeStatusListener() {
return chillModeStatusListener;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus;
Expand Down Expand Up @@ -253,8 +254,8 @@ public final class SCMEvents {
*/
public static final TypedEvent<Boolean> START_REPLICATION =
new TypedEvent<>(Boolean.class);
public static final TypedEvent<Boolean> CHILL_MODE_STATUS =
new TypedEvent<>(Boolean.class);
public static final TypedEvent<ChillModeStatus> CHILL_MODE_STATUS =
new TypedEvent<>(ChillModeStatus.class);

/**
* Private Ctor. Never Constructed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
Expand Down Expand Up @@ -95,7 +93,7 @@
* The RPC server that listens to requests from clients.
*/
public class SCMClientProtocolServer implements
StorageContainerLocationProtocol, EventHandler<Boolean>, Auditor {
StorageContainerLocationProtocol, Auditor {
private static final Logger LOG =
LoggerFactory.getLogger(SCMClientProtocolServer.class);
private static final AuditLogger AUDIT =
Expand Down Expand Up @@ -496,14 +494,6 @@ public StorageContainerManager getScm() {
return scm;
}

/**
* Set chill mode status based on SCMEvents.CHILL_MODE_STATUS event.
*/
@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
chillModePrecheck.setInChillMode(inChillMode);
}

/**
* Set chill mode status based on .
*/
Expand Down Expand Up @@ -561,4 +551,13 @@ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
public void close() throws IOException {
stop();
}

/**
* Set ChillMode status.
*
* @param chillModeStatus
*/
public void setChillModeStatus(boolean chillModeStatus) {
chillModePrecheck.setInChillMode(chillModeStatus);
}
}
Loading