Skip to content

Commit fbb11e7

Browse files
committed
HBASE-27218 Support rolling upgrading (#4808)
Signed-off-by: Yu Li <[email protected]>
1 parent 8440ee2 commit fbb11e7

File tree

22 files changed

+1917
-16
lines changed

22 files changed

+1917
-16
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,11 @@ public String getRsPath(ServerName sn) {
220220
* @param suffix ending of znode name
221221
* @return result of properly joining prefix with suffix
222222
*/
223-
public static String joinZNode(String prefix, String suffix) {
224-
return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix;
223+
public static String joinZNode(String prefix, String... suffix) {
224+
StringBuilder sb = new StringBuilder(prefix);
225+
for (String s : suffix) {
226+
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
227+
}
228+
return sb.toString();
225229
}
226230
}

hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.concurrent.ThreadLocalRandom;
2425
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
2526
import org.apache.hadoop.hbase.metrics.Counter;
2627
import org.apache.hadoop.hbase.metrics.Histogram;
@@ -33,6 +34,7 @@
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

37+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
3638
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
3739

3840
/**
@@ -1011,6 +1013,19 @@ final void doReleaseLock(TEnvironment env, ProcedureStore store) {
10111013
releaseLock(env);
10121014
}
10131015

1016+
protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
1017+
throws ProcedureSuspendedException {
1018+
if (jitter) {
1019+
// 10% possible jitter
1020+
double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
1021+
timeoutMillis += add;
1022+
}
1023+
setTimeout(timeoutMillis);
1024+
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
1025+
skipPersistence();
1026+
throw new ProcedureSuspendedException();
1027+
}
1028+
10141029
@Override
10151030
public int compareTo(final Procedure<TEnvironment> other) {
10161031
return Long.compare(getProcId(), other.getProcId());

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,3 +722,15 @@ enum AssignReplicationQueuesState {
722722
message AssignReplicationQueuesStateData {
723723
required ServerName crashed_server = 1;
724724
}
725+
726+
enum MigrateReplicationQueueFromZkToTableState {
727+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
728+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
729+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
730+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
731+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
732+
}
733+
734+
message MigrateReplicationQueueFromZkToTableStateData {
735+
repeated string disabled_peer_id = 1;
736+
}

hbase-replication/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@
9898
<artifactId>junit</artifactId>
9999
<scope>test</scope>
100100
</dependency>
101+
<dependency>
102+
<groupId>org.hamcrest</groupId>
103+
<artifactId>hamcrest-core</artifactId>
104+
<scope>test</scope>
105+
</dependency>
106+
<dependency>
107+
<groupId>org.hamcrest</groupId>
108+
<artifactId>hamcrest-library</artifactId>
109+
<scope>test</scope>
110+
</dependency>
101111
<dependency>
102112
<groupId>org.mockito</groupId>
103113
<artifactId>mockito-core</artifactId>

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Set;
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hbase.ServerName;
25+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
2526
import org.apache.hadoop.hbase.util.Pair;
2627
import org.apache.yetus.audience.InterfaceAudience;
2728

@@ -184,4 +185,22 @@ void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
184185
* @return Whether the replication queue table exists
185186
*/
186187
boolean hasData() throws ReplicationException;
188+
189+
// the below 3 methods are used for migrating
190+
/**
191+
* Update the replication queue datas for a given region server.
192+
*/
193+
void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
194+
throws ReplicationException;
195+
196+
/**
197+
* Update last pushed sequence id for the given regions and peers.
198+
*/
199+
void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
200+
throws ReplicationException;
201+
202+
/**
203+
* Add the given hfile refs to the given peer.
204+
*/
205+
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
187206
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import java.util.ArrayList;
2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.HashMap;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.NavigableMap;
2829
import java.util.Set;
2930
import java.util.function.Supplier;
31+
import java.util.stream.Collectors;
3032
import org.apache.hadoop.fs.Path;
3133
import org.apache.hadoop.hbase.Cell;
3234
import org.apache.hadoop.hbase.CellScanner;
@@ -46,6 +48,7 @@
4648
import org.apache.hadoop.hbase.client.Scan.ReadType;
4749
import org.apache.hadoop.hbase.client.Table;
4850
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
51+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
4952
import org.apache.hadoop.hbase.util.Bytes;
5053
import org.apache.hadoop.hbase.util.FutureUtils;
5154
import org.apache.hadoop.hbase.util.Pair;
@@ -74,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
7477

7578
private final TableName tableName;
7679

77-
@FunctionalInterface
78-
private interface TableCreator {
79-
80-
void create() throws IOException;
81-
}
82-
8380
public TableReplicationQueueStorage(Connection conn, TableName tableName) {
8481
this.conn = conn;
8582
this.tableName = tableName;
@@ -541,4 +538,60 @@ public boolean hasData() throws ReplicationException {
541538
throw new ReplicationException("failed to get replication queue table", e);
542539
}
543540
}
541+
542+
@Override
543+
public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
544+
throws ReplicationException {
545+
List<Put> puts = new ArrayList<>();
546+
for (ReplicationQueueData data : datas) {
547+
if (data.getOffsets().isEmpty()) {
548+
continue;
549+
}
550+
Put put = new Put(Bytes.toBytes(data.getId().toString()));
551+
data.getOffsets().forEach((walGroup, offset) -> {
552+
put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
553+
});
554+
puts.add(put);
555+
}
556+
try (Table table = conn.getTable(tableName)) {
557+
table.put(puts);
558+
} catch (IOException e) {
559+
throw new ReplicationException("failed to batch update queues", e);
560+
}
561+
}
562+
563+
@Override
564+
public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
565+
throws ReplicationException {
566+
Map<String, Put> peerId2Put = new HashMap<>();
567+
for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
568+
peerId2Put
569+
.computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
570+
.addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
571+
Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
572+
}
573+
try (Table table = conn.getTable(tableName)) {
574+
table
575+
.put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
576+
} catch (IOException e) {
577+
throw new ReplicationException("failed to batch update last pushed sequence ids", e);
578+
}
579+
}
580+
581+
@Override
582+
public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
583+
throws ReplicationException {
584+
if (hfileRefs.isEmpty()) {
585+
return;
586+
}
587+
Put put = new Put(Bytes.toBytes(peerId));
588+
for (String ref : hfileRefs) {
589+
put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
590+
}
591+
try (Table table = conn.getTable(tableName)) {
592+
table.put(put);
593+
} catch (IOException e) {
594+
throw new ReplicationException("failed to batch update hfile references", e);
595+
}
596+
}
544597
}

0 commit comments

Comments
 (0)