Skip to content

Commit 341c076

Browse files
bharatviswa504nandakumar131
authored andcommitted
HDDS-1196. Add a ReplicationStartTimer class. (#567)
1 parent e026041 commit 341c076

File tree

17 files changed

+361
-79
lines changed

17 files changed

+361
-79
lines changed

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package org.apache.hadoop.hdds;
1818

1919
import org.apache.hadoop.utils.db.DBProfile;
20+
import org.apache.ratis.util.TimeDuration;
21+
22+
import java.util.concurrent.TimeUnit;
2023

2124
/**
2225
* This class contains constants for configuration keys and default values
@@ -70,6 +73,14 @@ public final class HddsConfigKeys {
7073
"hdds.scm.chillmode.min.datanode";
7174
public static final int HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT = 1;
7275

76+
77+
public static final String
78+
HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT =
79+
"hdds.scm.wait.time.after.chillmode.exit";
80+
81+
public static final String
82+
HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT = "5m";
83+
7384
public static final String HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK =
7485
"hdds.scm.chillmode.pipeline-availability.check";
7586
public static final boolean

hadoop-hdds/common/src/main/resources/ozone-default.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,16 @@
12891289
</description>
12901290
</property>
12911291

1292+
<property>
1293+
<name>hdds.scm.wait.time.after.chillmode.exit</name>
1294+
<value>5m</value>
1295+
<tag>HDDS,SCM,OPERATION</tag>
1296+
<description> After exiting chillmode, wait for configured interval of
1297+
time to start replication monitor and cleanup activities of unhealthy
1298+
pipelines.
1299+
</description>
1300+
</property>
1301+
12921302
<property>
12931303
<name>hdds.scm.chillmode.enabled</name>
12941304
<value>true</value>

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,11 @@ AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type,
7979
* @return the block deleting service executed in SCM.
8080
*/
8181
SCMBlockDeletingService getSCMBlockDeletingService();
82+
83+
/**
84+
* Set ChillMode status.
85+
*
86+
* @param chillModeStatus
87+
*/
88+
void setChillModeStatus(boolean chillModeStatus);
8289
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@
6666

6767

6868
/** Block Manager manages the block access for SCM. */
69-
public class BlockManagerImpl implements EventHandler<Boolean>,
70-
BlockManager, BlockmanagerMXBean {
69+
public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
7170
private static final Logger LOG =
7271
LoggerFactory.getLogger(BlockManagerImpl.class);
7372
// TODO : FIX ME : Hard coding the owner.
@@ -337,8 +336,8 @@ public SCMBlockDeletingService getSCMBlockDeletingService() {
337336
}
338337

339338
@Override
340-
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
341-
this.chillModePrecheck.setInChillMode(inChillMode);
339+
public void setChillModeStatus(boolean chillModeStatus) {
340+
this.chillModePrecheck.setInChillMode(chillModeStatus);
342341
}
343342

344343
/**
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* <p>
9+
* <p>http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* <p>Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.chillmode;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hdds.HddsConfigKeys;
22+
import org.apache.hadoop.hdds.scm.block.BlockManager;
23+
import org.apache.hadoop.hdds.scm.container.replication.
24+
ReplicationActivityStatus;
25+
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
26+
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
27+
import org.apache.hadoop.hdds.server.events.EventHandler;
28+
import org.apache.hadoop.hdds.server.events.EventPublisher;
29+
30+
import java.util.Objects;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
34+
/**
35+
* Class to handle the activities needed to be performed after exiting chill
36+
* mode.
37+
*/
38+
public class ChillModeHandler implements EventHandler<ChillModeStatus> {
39+
40+
private final SCMClientProtocolServer scmClientProtocolServer;
41+
private final BlockManager scmBlockManager;
42+
private final long waitTime;
43+
private final AtomicBoolean isInChillMode = new AtomicBoolean(true);
44+
private final ReplicationActivityStatus replicationActivityStatus;
45+
46+
47+
/**
48+
* ChillModeHandler, to handle the logic once we exit chill mode.
49+
* @param configuration
50+
* @param clientProtocolServer
51+
* @param blockManager
52+
* @param replicationStatus
53+
*/
54+
public ChillModeHandler(Configuration configuration,
55+
SCMClientProtocolServer clientProtocolServer,
56+
BlockManager blockManager,
57+
ReplicationActivityStatus replicationStatus) {
58+
Objects.requireNonNull(configuration, "Configuration cannot be null");
59+
Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " +
60+
"object cannot be null");
61+
Objects.requireNonNull(blockManager, "BlockManager object cannot be null");
62+
Objects.requireNonNull(replicationStatus, "ReplicationActivityStatus " +
63+
"object cannot be null");
64+
this.waitTime = configuration.getTimeDuration(
65+
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
66+
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT,
67+
TimeUnit.MILLISECONDS);
68+
scmClientProtocolServer = clientProtocolServer;
69+
scmBlockManager = blockManager;
70+
replicationActivityStatus = replicationStatus;
71+
72+
boolean chillModeEnabled = configuration.getBoolean(
73+
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
74+
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
75+
isInChillMode.set(chillModeEnabled);
76+
77+
}
78+
79+
/**
80+
* Set ChillMode status based on
81+
* {@link org.apache.hadoop.hdds.scm.events.SCMEvents#CHILL_MODE_STATUS}.
82+
*
83+
* Inform BlockManager, ScmClientProtocolServer and replicationAcitivity
84+
* status about chillMode status.
85+
*
86+
* @param chillModeStatus
87+
* @param publisher
88+
*/
89+
@Override
90+
public void onMessage(ChillModeStatus chillModeStatus,
91+
EventPublisher publisher) {
92+
isInChillMode.set(chillModeStatus.getChillModeStatus());
93+
94+
replicationActivityStatus.fireReplicationStart(isInChillMode.get(),
95+
waitTime);
96+
scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
97+
scmBlockManager.setChillModeStatus(isInChillMode.get());
98+
99+
}
100+
101+
public boolean getChillModeStatus() {
102+
return isInChillMode.get();
103+
}
104+
105+
106+
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public SCMChillModeManager(Configuration conf,
7373
this.isChillModeEnabled = conf.getBoolean(
7474
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
7575
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
76+
7677
if (isChillModeEnabled) {
7778
ContainerChillModeRule containerChillModeRule =
7879
new ContainerChillModeRule(config, allContainers, this);
@@ -111,7 +112,8 @@ public SCMChillModeManager(Configuration conf,
111112
*/
112113
@VisibleForTesting
113114
public void emitChillModeStatus() {
114-
eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS, getInChillMode());
115+
eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS,
116+
new ChillModeStatus(getInChillMode()));
115117
}
116118

117119
public void validateChillModeExitRules(EventPublisher eventQueue) {
@@ -185,4 +187,20 @@ public OneReplicaPipelineChillModeRule getOneReplicaPipelineChillModeRule() {
185187
exitRules.get(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE);
186188
}
187189

190+
191+
/**
192+
* Class used during ChillMode status event.
193+
*/
194+
public static class ChillModeStatus {
195+
196+
private boolean chillModeStatus;
197+
public ChillModeStatus(boolean chillModeState) {
198+
this.chillModeStatus = chillModeState;
199+
}
200+
201+
public boolean getChillModeStatus() {
202+
return chillModeStatus;
203+
}
204+
}
205+
188206
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import javax.management.ObjectName;
2121
import java.io.Closeable;
2222
import java.io.IOException;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.atomic.AtomicBoolean;
24-
import org.apache.hadoop.hdds.server.events.EventHandler;
25-
import org.apache.hadoop.hdds.server.events.EventPublisher;
25+
26+
import org.apache.hadoop.hdds.HddsConfigKeys;
2627
import org.apache.hadoop.metrics2.util.MBeans;
2728

29+
2830
import com.google.common.annotations.VisibleForTesting;
2931
import org.slf4j.Logger;
3032
import org.slf4j.LoggerFactory;
@@ -39,15 +41,8 @@ public class ReplicationActivityStatus implements
3941
LoggerFactory.getLogger(ReplicationActivityStatus.class);
4042

4143
private AtomicBoolean replicationEnabled = new AtomicBoolean();
42-
private AtomicBoolean replicationStatusSetExternally = new AtomicBoolean();
4344
private ObjectName jmxObjectName;
44-
private ReplicationStatusListener replicationStatusListener;
45-
private ChillModeStatusListener chillModeStatusListener;
4645

47-
public ReplicationActivityStatus(){
48-
replicationStatusListener = new ReplicationStatusListener();
49-
chillModeStatusListener = new ChillModeStatusListener();
50-
}
5146
@Override
5247
public boolean isReplicationEnabled() {
5348
return replicationEnabled.get();
@@ -84,35 +79,26 @@ public void close() throws IOException {
8479
}
8580

8681
/**
87-
* Replication status listener.
88-
*/
89-
class ReplicationStatusListener implements EventHandler<Boolean> {
90-
@Override
91-
public void onMessage(Boolean status, EventPublisher publisher) {
92-
replicationStatusSetExternally.set(true);
93-
replicationEnabled.set(status);
94-
}
95-
}
96-
97-
/**
98-
* Replication status is influenced by Chill mode status as well.
82+
* Waits for
83+
* {@link HddsConfigKeys#HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT} and set
84+
* replicationEnabled to start replication monitor thread.
9985
*/
100-
class ChillModeStatusListener implements EventHandler<Boolean> {
101-
102-
@Override
103-
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
104-
if (!replicationStatusSetExternally.get()) {
105-
replicationEnabled.set(!inChillMode);
106-
}
86+
public void fireReplicationStart(boolean chillModeStatus,
87+
long waitTime) {
88+
if (!chillModeStatus) {
89+
CompletableFuture.runAsync(() -> {
90+
try {
91+
Thread.sleep(waitTime);
92+
} catch (InterruptedException ex) {
93+
LOG.error("Interrupted during wait, replication event is not fired",
94+
ex);
95+
}
96+
setReplicationEnabled(true);
97+
LOG.info("Replication Timer sleep for {} ms completed. Enable " +
98+
"Replication", waitTime);
99+
});
107100
}
108101
}
109102

110-
public ReplicationStatusListener getReplicationStatusListener() {
111-
return replicationStatusListener;
112-
}
113-
114-
public ChillModeStatusListener getChillModeStatusListener() {
115-
return chillModeStatusListener;
116-
}
117103

118104
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2323
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
24+
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
2425
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
2526
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
2627
.ReplicationStatus;
@@ -253,8 +254,8 @@ public final class SCMEvents {
253254
*/
254255
public static final TypedEvent<Boolean> START_REPLICATION =
255256
new TypedEvent<>(Boolean.class);
256-
public static final TypedEvent<Boolean> CHILL_MODE_STATUS =
257-
new TypedEvent<>(Boolean.class);
257+
public static final TypedEvent<ChillModeStatus> CHILL_MODE_STATUS =
258+
new TypedEvent<>(ChillModeStatus.class);
258259

259260
/**
260261
* Private Ctor. Never Constructed.

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@
4949
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
5050
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
5151
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
52-
import org.apache.hadoop.hdds.server.events.EventHandler;
53-
import org.apache.hadoop.hdds.server.events.EventPublisher;
5452
import org.apache.hadoop.io.IOUtils;
5553
import org.apache.hadoop.ipc.ProtobufRpcEngine;
5654
import org.apache.hadoop.ipc.RPC;
@@ -95,7 +93,7 @@
9593
* The RPC server that listens to requests from clients.
9694
*/
9795
public class SCMClientProtocolServer implements
98-
StorageContainerLocationProtocol, EventHandler<Boolean>, Auditor {
96+
StorageContainerLocationProtocol, Auditor {
9997
private static final Logger LOG =
10098
LoggerFactory.getLogger(SCMClientProtocolServer.class);
10199
private static final AuditLogger AUDIT =
@@ -496,14 +494,6 @@ public StorageContainerManager getScm() {
496494
return scm;
497495
}
498496

499-
/**
500-
* Set chill mode status based on SCMEvents.CHILL_MODE_STATUS event.
501-
*/
502-
@Override
503-
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
504-
chillModePrecheck.setInChillMode(inChillMode);
505-
}
506-
507497
/**
508498
* Set chill mode status based on .
509499
*/
@@ -561,4 +551,13 @@ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
561551
public void close() throws IOException {
562552
stop();
563553
}
554+
555+
/**
556+
* Set ChillMode status.
557+
*
558+
* @param chillModeStatus
559+
*/
560+
public void setChillModeStatus(boolean chillModeStatus) {
561+
chillModePrecheck.setInChillMode(chillModeStatus);
562+
}
564563
}

0 commit comments

Comments
 (0)