Skip to content

Commit 0352d69

Browse files
committed
HBASE-27218 Support rolling upgrading
1 parent de2d20a commit 0352d69

File tree

13 files changed

+816
-16
lines changed

13 files changed

+816
-16
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,11 @@ public String getRsPath(ServerName sn) {
200200
* @param suffix ending of znode name
201201
* @return result of properly joining prefix with suffix
202202
*/
203-
public static String joinZNode(String prefix, String suffix) {
204-
return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix;
203+
public static String joinZNode(String prefix, String... suffix) {
204+
StringBuilder sb = new StringBuilder(prefix);
205+
for (String s : suffix) {
206+
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
207+
}
208+
return sb.toString();
205209
}
206210
}

hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.concurrent.ThreadLocalRandom;
2425
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
2526
import org.apache.hadoop.hbase.metrics.Counter;
2627
import org.apache.hadoop.hbase.metrics.Histogram;
@@ -33,6 +34,7 @@
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

37+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
3638
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
3739

3840
/**
@@ -1011,6 +1013,19 @@ final void doReleaseLock(TEnvironment env, ProcedureStore store) {
10111013
releaseLock(env);
10121014
}
10131015

1016+
protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
1017+
throws ProcedureSuspendedException {
1018+
if (jitter) {
1019+
// 10% possible jitter
1020+
double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
1021+
timeoutMillis += add;
1022+
}
1023+
setTimeout(timeoutMillis);
1024+
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
1025+
skipPersistence();
1026+
throw new ProcedureSuspendedException();
1027+
}
1028+
10141029
@Override
10151030
public int compareTo(final Procedure<TEnvironment> other) {
10161031
return Long.compare(getProcId(), other.getProcId());

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,3 +719,16 @@ enum AssignReplicationQueuesState {
719719
message AssignReplicationQueuesStateData {
720720
required ServerName crashed_server = 1;
721721
}
722+
723+
enum MigrateReplicationQueueFromZkToTableState {
724+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
725+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
726+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
727+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
728+
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
729+
}
730+
731+
message MigrateReplicationQueueFromZkToTableStateData {
732+
repeated string peer_id = 1;
733+
repeated string disabled_peer_id = 2;
734+
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Set;
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hbase.ServerName;
25+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.ZkLastPushedSeqId;
2526
import org.apache.hadoop.hbase.util.Pair;
2627
import org.apache.yetus.audience.InterfaceAudience;
2728

@@ -178,4 +179,12 @@ void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
178179
* created hfile references during the call may not be included.
179180
*/
180181
Set<String> getAllHFileRefs() throws ReplicationException;
182+
183+
// the below 3 methods are used for migrating
184+
void batchUpdate(ServerName serverName, List<ReplicationQueueData> datas)
185+
throws ReplicationException;
186+
187+
void batchUpdate(List<ZkLastPushedSeqId> lastPushedSeqIds) throws ReplicationException;
188+
189+
void batchUpdate(String peerId, List<String> hfileRefs) throws ReplicationException;
181190
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import java.util.ArrayList;
2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.HashMap;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.NavigableMap;
2829
import java.util.Set;
2930
import java.util.function.Supplier;
31+
import java.util.stream.Collectors;
3032
import org.apache.hadoop.fs.Path;
3133
import org.apache.hadoop.hbase.Cell;
3234
import org.apache.hadoop.hbase.CellScanner;
@@ -46,6 +48,7 @@
4648
import org.apache.hadoop.hbase.client.Scan.ReadType;
4749
import org.apache.hadoop.hbase.client.Table;
4850
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
51+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.ZkLastPushedSeqId;
4952
import org.apache.hadoop.hbase.util.Bytes;
5053
import org.apache.hadoop.hbase.util.FutureUtils;
5154
import org.apache.hadoop.hbase.util.Pair;
@@ -532,4 +535,51 @@ public Set<String> getAllHFileRefs() throws ReplicationException {
532535
throw new ReplicationException("failed to getAllHFileRefs", e);
533536
}
534537
}
538+
539+
@Override
540+
public void batchUpdate(ServerName serverName, List<ReplicationQueueData> datas)
541+
throws ReplicationException {
542+
List<Put> puts = new ArrayList<>();
543+
for (ReplicationQueueData data : datas) {
544+
Put put = new Put(Bytes.toBytes(data.getId().toString()));
545+
data.getOffsets().forEach((walGroup, offset) -> {
546+
put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
547+
});
548+
puts.add(put);
549+
}
550+
try (Table table = conn.getTable(tableName)) {
551+
table.put(puts);
552+
} catch (IOException e) {
553+
throw new ReplicationException("failed to batch update queues", e);
554+
}
555+
}
556+
557+
@Override
558+
public void batchUpdate(List<ZkLastPushedSeqId> lastPushedSeqIds) throws ReplicationException {
559+
Map<String, Put> peerId2Put = new HashMap<>();
560+
for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
561+
peerId2Put
562+
.computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
563+
.addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
564+
Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
565+
}
566+
try (Table table = conn.getTable(tableName)) {
567+
table.put(peerId2Put.values().stream().collect(Collectors.toList()));
568+
} catch (IOException e) {
569+
throw new ReplicationException("failed to batch update last pushed sequence ids", e);
570+
}
571+
}
572+
573+
@Override
574+
public void batchUpdate(String peerId, List<String> hfileRefs) throws ReplicationException {
575+
Put put = new Put(Bytes.toBytes(peerId));
576+
for (String ref : hfileRefs) {
577+
put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
578+
}
579+
try (Table table = conn.getTable(tableName)) {
580+
table.put(put);
581+
} catch (IOException e) {
582+
throw new ReplicationException("failed to batch update hfile references", e);
583+
}
584+
}
535585
}

0 commit comments

Comments
 (0)