Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -51,10 +52,10 @@
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.io.InterruptedIOException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -33,6 +32,7 @@
import org.apache.zookeeper.data.Stat;

import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
Expand All @@ -57,6 +57,9 @@
*/
@InterfaceAudience.Private
public class MasterAddressTracker extends ZKNodeTracker {

private volatile List<ServerName> backupMasters = Collections.emptyList();

/**
* Construct a master address listener with the specified
* <code>zookeeper</code> reference.
Expand All @@ -72,6 +75,26 @@ public MasterAddressTracker(ZKWatcher watcher, Abortable abortable) {
super(watcher, watcher.getZNodePaths().masterAddressZNode, abortable);
}

private void loadBackupMasters() {
try {
backupMasters = Collections.unmodifiableList(getBackupMastersAndRenewWatch(watcher));
} catch (InterruptedIOException e) {
abortable.abort("Unexpected exception handling nodeChildrenChanged event", e);
}
}

@Override
protected void postStart() {
loadBackupMasters();
}

@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().backupMasterAddressesZNode)) {
loadBackupMasters();
}
}

/**
* Get the address of the current master if one is available. Returns null
* if no current master.
Expand Down Expand Up @@ -252,11 +275,12 @@ public static ZooKeeperProtos.Master parse(byte[] data) throws DeserializationEx
}
int prefixLen = ProtobufUtil.lengthOfPBMagic();
try {
return ZooKeeperProtos.Master.PARSER.parseFrom(data, prefixLen, data.length - prefixLen);
return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}

/**
* delete the master znode if its content is same as the parameter
* @param zkw must not be null
Expand Down Expand Up @@ -284,7 +308,7 @@ public static boolean deleteIfEquals(ZKWatcher zkw, final String content) {
}

public List<ServerName> getBackupMasters() throws InterruptedIOException {
return getBackupMastersAndRenewWatch(watcher);
return backupMasters;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class ZKNodeTracker extends ZKListener {
protected final String node;

/** Data of the node being tracked */
private byte [] data;
private byte[] data;

/** Used to abort if a fatal error occurs */
protected final Abortable abortable;
Expand All @@ -51,16 +51,14 @@ public abstract class ZKNodeTracker extends ZKListener {

/**
* Constructs a new ZK node tracker.
*
* <p>After construction, use {@link #start} to kick off tracking.
*
* <p/>
* After construction, use {@link #start} to kick off tracking.
* @param watcher reference to the {@link ZKWatcher} which also contains configuration and
* constants
* constants
* @param node path of the node being tracked
* @param abortable used to abort if a fatal error occurs
*/
public ZKNodeTracker(ZKWatcher watcher, String node,
Abortable abortable) {
public ZKNodeTracker(ZKWatcher watcher, String node, Abortable abortable) {
super(watcher);
this.node = node;
this.abortable = abortable;
Expand All @@ -69,9 +67,9 @@ public ZKNodeTracker(ZKWatcher watcher, String node,

/**
* Starts the tracking of the node in ZooKeeper.
*
* <p>Use {@link #blockUntilAvailable()} to block until the node is available
* or {@link #getData(boolean)} to get the data of the node if it is available.
* <p/>
* Use {@link #blockUntilAvailable()} to block until the node is available or
* {@link #getData(boolean)} to get the data of the node if it is available.
*/
public synchronized void start() {
this.watcher.registerListener(this);
Expand All @@ -89,6 +87,13 @@ public synchronized void start() {
} catch (KeeperException e) {
abortable.abort("Unexpected exception during initialization, aborting", e);
}
postStart();
}

/**
* Called after start is called. Sub classes could implement this method to load more data on zk.
*/
protected void postStart() {
}

public synchronized void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
package org.apache.hadoop.hbase.zookeeper;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -25,16 +25,11 @@
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -46,18 +41,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({RegionServerTests.class, MediumTests.class})
@Category({ ZKTests.class, MediumTests.class })
public class TestMasterAddressTracker {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterAddressTracker.class);
HBaseClassTestRule.forClass(TestMasterAddressTracker.class);

private static final Logger LOG = LoggerFactory.getLogger(TestMasterAddressTracker.class);

private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private final static HBaseZKTestingUtil TEST_UTIL = new HBaseZKTestingUtil();

// Cleaned up after each unit test.
private static ZKWatcher zk;
private ZKWatcher zk;

@Rule
public TestName name = new TestName();
Expand All @@ -81,15 +77,15 @@ public static void tearDownAfterClass() throws Exception {

@Test
public void testDeleteIfEquals() throws Exception {
final ServerName sn = ServerName.valueOf("localhost", 1234,
EnvironmentEdgeManager.currentTime());
final ServerName sn =
ServerName.valueOf("localhost", 1234, EnvironmentEdgeManager.currentTime());
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1772);
try {
assertFalse("shouldn't have deleted wrong master server.",
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), "some other string."));
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), "some other string."));
} finally {
assertTrue("Couldn't clean up master",
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
}
}

Expand All @@ -99,9 +95,8 @@ public void testDeleteIfEquals() throws Exception {
* @param infoPort if there is an active master, set its info port.
*/
private MasterAddressTracker setupMasterTracker(final ServerName sn, final int infoPort)
throws Exception {
zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
name.getMethodName(), null);
throws Exception {
zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode);
ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().backupMasterAddressesZNode);

Expand All @@ -112,14 +107,14 @@ private MasterAddressTracker setupMasterTracker(final ServerName sn, final int i
zk.registerListener(addressTracker);

// Use a listener to capture when the node is actually created
NodeCreationListener listener = new NodeCreationListener(zk,
zk.getZNodePaths().masterAddressZNode);
NodeCreationListener listener =
new NodeCreationListener(zk, zk.getZNodePaths().masterAddressZNode);
zk.registerListener(listener);

if (sn != null) {
LOG.info("Creating master node");
MasterAddressTracker.setMasterAddress(zk, zk.getZNodePaths().masterAddressZNode,
sn, infoPort);
MasterAddressTracker.setMasterAddress(zk, zk.getZNodePaths().masterAddressZNode, sn,
infoPort);

// Wait for the node to be created
LOG.info("Waiting for master address manager to be notified");
Expand All @@ -130,16 +125,15 @@ private MasterAddressTracker setupMasterTracker(final ServerName sn, final int i
}

/**
* Unit tests that uses ZooKeeper but does not use the master-side methods
* but rather acts directly on ZK.
* @throws Exception
* Unit tests that uses ZooKeeper but does not use the master-side methods but rather acts
* directly on ZK.
*/
@Test
public void testMasterAddressTrackerFromZK() throws Exception {
// Create the master node with a dummy address
final int infoPort = 1235;
final ServerName sn = ServerName.valueOf("localhost", 1234,
EnvironmentEdgeManager.currentTime());
final ServerName sn =
ServerName.valueOf("localhost", 1234, EnvironmentEdgeManager.currentTime());
final MasterAddressTracker addressTracker = setupMasterTracker(sn, infoPort);
try {
assertTrue(addressTracker.hasMaster());
Expand All @@ -148,29 +142,27 @@ public void testMasterAddressTrackerFromZK() throws Exception {
assertEquals(infoPort, addressTracker.getMasterInfoPort());
} finally {
assertTrue("Couldn't clean up master",
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
}
}


@Test
public void testParsingNull() throws Exception {
assertNull("parse on null data should return null.", MasterAddressTracker.parse(null));
}

@Test
public void testNoBackups() throws Exception {
final ServerName sn = ServerName.valueOf("localhost", 1234,
EnvironmentEdgeManager.currentTime());
final ServerName sn =
ServerName.valueOf("localhost", 1234, EnvironmentEdgeManager.currentTime());
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1772);
try {
assertEquals("Should receive 0 for backup not found.", 0,
addressTracker.getBackupMasterInfoPort(
ServerName.valueOf("doesnotexist.example.com", 1234,
EnvironmentEdgeManager.currentTime())));
addressTracker.getBackupMasterInfoPort(ServerName.valueOf("doesnotexist.example.com", 1234,
EnvironmentEdgeManager.currentTime())));
} finally {
assertTrue("Couldn't clean up master",
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
}
}

Expand All @@ -184,8 +176,8 @@ public void testNoMaster() throws Exception {

@Test
public void testBackupMasters() throws Exception {
final ServerName sn = ServerName.valueOf("localhost", 5678,
EnvironmentEdgeManager.currentTime());
final ServerName sn =
ServerName.valueOf("localhost", 5678, EnvironmentEdgeManager.currentTime());
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1111);
assertTrue(addressTracker.hasMaster());
ServerName activeMaster = addressTracker.getMasterAddress();
Expand All @@ -195,13 +187,14 @@ public void testBackupMasters() throws Exception {
assertEquals(0, backupMasters.size());
ServerName backupMaster1 = ServerName.valueOf("localhost", 2222, -1);
ServerName backupMaster2 = ServerName.valueOf("localhost", 3333, -1);
String backupZNode1 = ZNodePaths.joinZNode(
zk.getZNodePaths().backupMasterAddressesZNode, backupMaster1.toString());
String backupZNode2 = ZNodePaths.joinZNode(
zk.getZNodePaths().backupMasterAddressesZNode, backupMaster2.toString());
// Add a backup master
String backupZNode1 =
ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupMaster1.toString());
String backupZNode2 =
ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupMaster2.toString());
// Add backup masters
MasterAddressTracker.setMasterAddress(zk, backupZNode1, backupMaster1, 2222);
MasterAddressTracker.setMasterAddress(zk, backupZNode2, backupMaster2, 3333);
TEST_UTIL.waitFor(30000, () -> addressTracker.getBackupMasters().size() == 2);
backupMasters = addressTracker.getBackupMasters();
assertEquals(2, backupMasters.size());
assertTrue(backupMasters.contains(backupMaster1));
Expand All @@ -222,7 +215,7 @@ public NodeCreationListener(ZKWatcher watcher, String node) {

@Override
public void nodeCreated(String path) {
if(path.equals(node)) {
if (path.equals(node)) {
LOG.debug("nodeCreated(" + path + ")");
lock.release();
}
Expand All @@ -232,6 +225,4 @@ public void waitForCreation() throws InterruptedException {
lock.acquire();
}
}

}