diff --git a/LICENSE-binary b/LICENSE-binary
index be7b3cba387ac..ccf900bf1aa90 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -316,9 +316,9 @@ org.apache.commons:commons-lang3:3.12.0
 org.apache.commons:commons-math3:3.1.1
 org.apache.commons:commons-text:1.10.0
 org.apache.commons:commons-validator:1.6
-org.apache.curator:curator-client:4.2.0
-org.apache.curator:curator-framework:4.2.0
-org.apache.curator:curator-recipes:4.2.0
+org.apache.curator:curator-client:5.2.0
+org.apache.curator:curator-framework:5.2.0
+org.apache.curator:curator-recipes:5.2.0
 org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1
 org.apache.hbase:hbase-annotations:1.4.8
 org.apache.hbase:hbase-client:1.4.8
@@ -345,8 +345,6 @@ org.apache.kerby:kerby-util:1.0.1
 org.apache.kerby:kerby-xdr:1.0.1
 org.apache.kerby:token-provider:1.0.1
 org.apache.yetus:audience-annotations:0.5.0
-org.apache.zookeeper:zookeeper:3.5.6
-org.apache.zookeeper:zookeeper-jute:3.5.6
 org.codehaus.jettison:jettison:1.5.1
 org.eclipse.jetty:jetty-annotations:9.4.48.v20220622
 org.eclipse.jetty:jetty-http:9.4.48.v20220622
@@ -362,6 +360,7 @@ org.eclipse.jetty:jetty-webapp:9.4.48.v20220622
 org.eclipse.jetty:jetty-xml:9.4.48.v20220622
 org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.48.v20220622
 org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.48.v20220622
+org.apache.zookeeper:zookeeper:3.6.3
 org.ehcache:ehcache:3.3.1
 org.lz4:lz4-java:1.7.1
 org.objenesis:objenesis:2.6
diff --git a/hadoop-common-project/hadoop-auth/pom.xml b/hadoop-common-project/hadoop-auth/pom.xml
index 6405a8feba7c7..d93372908a956 100644
--- a/hadoop-common-project/hadoop-auth/pom.xml
+++ b/hadoop-common-project/hadoop-auth/pom.xml
@@ -128,6 +128,15 @@
       org.apache.zookeeper
       zookeeper
     
+    
+      io.dropwizard.metrics
+      metrics-core
+    
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
     
       org.apache.curator
       curator-framework
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 2b0e7c8079fa0..f31d9afa3bf67 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -336,6 +336,10 @@
         
       
     
+    
+      io.dropwizard.metrics
+      metrics-core
+    
     
       org.apache.zookeeper
       zookeeper
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
index 666000b2f48ae..2863a39f14226 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
@@ -41,7 +41,6 @@
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
@@ -60,10 +59,10 @@
  * we run these tests with the upstream ClientBase.
  */
 public abstract class ClientBaseWithFixes extends ZKTestCase {
-    protected static final Logger LOG = LoggerFactory.getLogger(ClientBaseWithFixes.class);
+  protected static final Logger LOG = LoggerFactory.getLogger(ClientBaseWithFixes.class);
 
-    public static int CONNECTION_TIMEOUT = 30000;
-    static final File BASETEST = GenericTestUtils.getTestDir();
+  public static int CONNECTION_TIMEOUT = 30000;
+  static final File BASETEST = GenericTestUtils.getTestDir();
 
   static {
     // The 4-letter-words commands are simple diagnostics telnet commands in
@@ -74,411 +73,409 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
     System.setProperty("zookeeper.4lw.commands.whitelist", "*");
   }
 
-    protected final String hostPort = initHostPort();
-    protected int maxCnxns = 0;
-    protected ServerCnxnFactory serverFactory = null;
-    protected File tmpDir = null;
+  protected final String hostPort = initHostPort();
+  protected int maxCnxns = 0;
+  protected ServerCnxnFactory serverFactory = null;
+  protected File tmpDir = null;
+  
+  long initialFdCount;
     
-    long initialFdCount;
-    
-    /**
-     * In general don't use this. Only use in the special case that you
-     * want to ignore results (for whatever reason) in your test. Don't
-     * use empty watchers in real code!
-     *
-     */
-    protected class NullWatcher implements Watcher {
-        @Override
-        public void process(WatchedEvent event) { /* nada */ }
-    }
-
-    protected static class CountdownWatcher implements Watcher {
-        // XXX this doesn't need to be volatile! (Should probably be final)
-        volatile CountDownLatch clientConnected;
-        volatile boolean connected;
-        protected ZooKeeper client;
-
-        public void initializeWatchedClient(ZooKeeper zk) {
-            if (client != null) {
-                throw new RuntimeException("Watched Client was already set");
-            }
-            client = zk;
-        }
+  /**
+   * In general don't use this. Only use in the special case that you
+   * want to ignore results (for whatever reason) in your test. Don't
+   * use empty watchers in real code!
+   *
+   */
+  protected class NullWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent event) { /* nada */ }
+  }
 
-        public CountdownWatcher() {
-            reset();
-        }
-        synchronized public void reset() {
-            clientConnected = new CountDownLatch(1);
-            connected = false;
-        }
-        @Override
-        synchronized public void process(WatchedEvent event) {
-            if (event.getState() == KeeperState.SyncConnected ||
-                event.getState() == KeeperState.ConnectedReadOnly) {
-                connected = true;
-                notifyAll();
-                clientConnected.countDown();
-            } else {
-                connected = false;
-                notifyAll();
-            }
-        }
-        synchronized boolean isConnected() {
-            return connected;
-        }
-        @VisibleForTesting
-        public synchronized void waitForConnected(long timeout)
-            throws InterruptedException, TimeoutException {
-            long expire = Time.now() + timeout;
-            long left = timeout;
-            while(!connected && left > 0) {
-                wait(left);
-                left = expire - Time.now();
-            }
-            if (!connected) {
-                throw new TimeoutException("Did not connect");
-
-            }
-        }
-        @VisibleForTesting
-        public synchronized void waitForDisconnected(long timeout)
-            throws InterruptedException, TimeoutException {
-            long expire = Time.now() + timeout;
-            long left = timeout;
-            while(connected && left > 0) {
-                wait(left);
-                left = expire - Time.now();
-            }
-            if (connected) {
-                throw new TimeoutException("Did not disconnect");
-
-            }
-        }
+  protected static class CountdownWatcher implements Watcher {
+    // XXX this doesn't need to be volatile! (Should probably be final)
+    volatile CountDownLatch clientConnected;
+    volatile boolean connected;
+    protected ZooKeeper client;
+
+    public void initializeWatchedClient(ZooKeeper zk) {
+      if (client != null) {
+          throw new RuntimeException("Watched Client was already set");
+      }
+      client = zk;
     }
 
-    protected TestableZooKeeper createClient()
-        throws IOException, InterruptedException
-    {
-        return createClient(hostPort);
+    public CountdownWatcher() {
+      reset();
     }
-
-    protected TestableZooKeeper createClient(String hp)
-        throws IOException, InterruptedException
-    {
-        CountdownWatcher watcher = new CountdownWatcher();
-        return createClient(watcher, hp);
+    synchronized public void reset() {
+      clientConnected = new CountDownLatch(1);
+      connected = false;
     }
-
-    private LinkedList allClients;
-    private boolean allClientsSetup = false;
-
-    protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
-        throws IOException, InterruptedException
-    {
-        return createClient(watcher, hp, CONNECTION_TIMEOUT);
+    @Override
+    synchronized public void process(WatchedEvent event) {
+      if (event.getState() == KeeperState.SyncConnected ||
+          event.getState() == KeeperState.ConnectedReadOnly) {
+          connected = true;
+          notifyAll();
+          clientConnected.countDown();
+      } else {
+          connected = false;
+          notifyAll();
+      }
     }
-
-    protected TestableZooKeeper createClient(CountdownWatcher watcher,
-            String hp, int timeout)
-        throws IOException, InterruptedException
-    {
-        watcher.reset();
-        TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
-        if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
-        {
-            Assert.fail("Unable to connect to server");
-        }
-        synchronized(this) {
-            if (!allClientsSetup) {
-                LOG.error("allClients never setup");
-                Assert.fail("allClients never setup");
-            }
-            if (allClients != null) {
-                allClients.add(zk);
-            } else {
-                // test done - close the zk, not needed
-                zk.close();
-            }
-        }
-        watcher.initializeWatchedClient(zk);
-        return zk;
+    synchronized boolean isConnected() {
+      return connected;
     }
-
-    public static class HostPort {
-        String host;
-        int port;
-        public HostPort(String host, int port) {
-            this.host = host;
-            this.port = port;
-        }
+    @VisibleForTesting
+    public synchronized void waitForConnected(long timeout)
+        throws InterruptedException, TimeoutException {
+      long expire = Time.now() + timeout;
+      long left = timeout;
+      while(!connected && left > 0) {
+          wait(left);
+          left = expire - Time.now();
+      }
+      if (!connected) {
+          throw new TimeoutException("Did not connect");
+
+      }
     }
-    public static List parseHostPortList(String hplist) {
-        ArrayList alist = new ArrayList();
-        for (String hp: hplist.split(",")) {
-            int idx = hp.lastIndexOf(':');
-            String host = hp.substring(0, idx);
-            int port;
-            try {
-                port = Integer.parseInt(hp.substring(idx + 1));
-            } catch(RuntimeException e) {
-                throw new RuntimeException("Problem parsing " + hp + e.toString());
-            }
-            alist.add(new HostPort(host,port));
-        }
-        return alist;
+    @VisibleForTesting
+    public synchronized void waitForDisconnected(long timeout)
+        throws InterruptedException, TimeoutException {
+      long expire = Time.now() + timeout;
+      long left = timeout;
+      while(connected && left > 0) {
+          wait(left);
+          left = expire - Time.now();
+      }
+      if (connected) {
+          throw new TimeoutException("Did not disconnect");
+
+      }
     }
+  }
+
+  protected TestableZooKeeper createClient()
+      throws IOException, InterruptedException
+  {
+    return createClient(hostPort);
+  }
+
+  protected TestableZooKeeper createClient(String hp)
+      throws IOException, InterruptedException
+  {
+    CountdownWatcher watcher = new CountdownWatcher();
+    return createClient(watcher, hp);
+  }
+
+  private LinkedList allClients;
+  private boolean allClientsSetup = false;
 
-    /**
-     * Send the 4letterword
-     * @param host the destination host
-     * @param port the destination port
-     * @param cmd the 4letterword
-     * @return
-     * @throws IOException
-     */
-    public static String send4LetterWord(String host, int port, String cmd)
-        throws IOException
+  protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
+      throws IOException, InterruptedException
+  {
+    return createClient(watcher, hp, CONNECTION_TIMEOUT);
+  }
+
+  protected TestableZooKeeper createClient(CountdownWatcher watcher,
+          String hp, int timeout)
+      throws IOException, InterruptedException
+  {
+    watcher.reset();
+    TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
+    if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
     {
-        LOG.info("connecting to " + host + " " + port);
-        Socket sock = new Socket(host, port);
-        BufferedReader reader = null;
-        try {
-            OutputStream outstream = sock.getOutputStream();
-            outstream.write(cmd.getBytes());
-            outstream.flush();
-            // this replicates NC - close the output stream before reading
-            sock.shutdownOutput();
-
-            reader =
-                new BufferedReader(
-                        new InputStreamReader(sock.getInputStream()));
-            StringBuilder sb = new StringBuilder();
-            String line;
-            while((line = reader.readLine()) != null) {
-                sb.append(line + "\n");
-            }
-            return sb.toString();
-        } finally {
-            sock.close();
-            if (reader != null) {
-                reader.close();
-            }
-        }
+      Assert.fail("Unable to connect to server");
+    }
+    synchronized(this) {
+      if (!allClientsSetup) {
+          LOG.error("allClients never setup");
+          Assert.fail("allClients never setup");
+      }
+      if (allClients != null) {
+          allClients.add(zk);
+      } else {
+          // test done - close the zk, not needed
+          zk.close();
+      }
     }
+    watcher.initializeWatchedClient(zk);
+    return zk;
+  }
 
-    public static boolean waitForServerUp(String hp, long timeout) {
-        long start = Time.now();
-        while (true) {
-            try {
-                // if there are multiple hostports, just take the first one
-                HostPort hpobj = parseHostPortList(hp).get(0);
-                String result = send4LetterWord(hpobj.host, hpobj.port, "stat");
-                if (result.startsWith("Zookeeper version:") &&
-                        !result.contains("READ-ONLY")) {
-                    return true;
-                }
-            } catch (IOException e) {
-                // ignore as this is expected
-                LOG.info("server " + hp + " not up " + e);
-            }
-
-            if (Time.now() > start + timeout) {
-                break;
-            }
-            try {
-                Thread.sleep(250);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        return false;
+  public static class HostPort {
+    String host;
+    int port;
+    public HostPort(String host, int port) {
+      this.host = host;
+      this.port = port;
     }
-    public static boolean waitForServerDown(String hp, long timeout) {
-        long start = Time.now();
-        while (true) {
-            try {
-                HostPort hpobj = parseHostPortList(hp).get(0);
-                send4LetterWord(hpobj.host, hpobj.port, "stat");
-            } catch (IOException e) {
-                return true;
-            }
-
-            if (Time.now() > start + timeout) {
-                break;
-            }
-            try {
-                Thread.sleep(250);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        return false;
+  }
+  public static List parseHostPortList(String hplist) {
+    ArrayList alist = new ArrayList();
+    for (String hp: hplist.split(",")) {
+      int idx = hp.lastIndexOf(':');
+      String host = hp.substring(0, idx);
+      int port;
+      try {
+          port = Integer.parseInt(hp.substring(idx + 1));
+      } catch(RuntimeException e) {
+          throw new RuntimeException("Problem parsing " + hp + e.toString());
+      }
+      alist.add(new HostPort(host,port));
     }
+    return alist;
+  }
 
-    public static File createTmpDir() throws IOException {
-        return createTmpDir(BASETEST);
-    }
-    static File createTmpDir(File parentDir) throws IOException {
-        File tmpFile = File.createTempFile("test", ".junit", parentDir);
-        // don't delete tmpFile - this ensures we don't attempt to create
-        // a tmpDir with a duplicate name
-        File tmpDir = new File(tmpFile + ".dir");
-        Assert.assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
-        Assert.assertTrue(tmpDir.mkdirs());
-
-        return tmpDir;
+  /**
+   * Send the 4letterword
+   * @param host the destination host
+   * @param port the destination port
+   * @param cmd the 4letterword
+   * @return
+   * @throws IOException
+   */
+  public static String send4LetterWord(String host, int port, String cmd)
+      throws IOException
+  {
+    LOG.info("connecting to " + host + " " + port);
+    Socket sock = new Socket(host, port);
+    BufferedReader reader = null;
+    try {
+      OutputStream outstream = sock.getOutputStream();
+      outstream.write(cmd.getBytes());
+      outstream.flush();
+      // this replicates NC - close the output stream before reading
+      sock.shutdownOutput();
+
+      reader =
+          new BufferedReader(
+                  new InputStreamReader(sock.getInputStream()));
+      StringBuilder sb = new StringBuilder();
+      String line;
+      while((line = reader.readLine()) != null) {
+          sb.append(line + "\n");
+      }
+      return sb.toString();
+    } finally {
+      sock.close();
+      if (reader != null) {
+          reader.close();
+      }
     }
+  }
 
-    private static int getPort(String hostPort) {
-        String[] split = hostPort.split(":");
-        String portstr = split[split.length-1];
-        String[] pc = portstr.split("/");
-        if (pc.length > 1) {
-            portstr = pc[0];
-        }
-        return Integer.parseInt(portstr);
+  public static boolean waitForServerUp(String hp, long timeout) {
+    long start = Time.now();
+    while (true) {
+      try {
+          // if there are multiple hostports, just take the first one
+          HostPort hpobj = parseHostPortList(hp).get(0);
+          String result = send4LetterWord(hpobj.host, hpobj.port, "stat");
+          if (result.startsWith("Zookeeper version:") &&
+                  !result.contains("READ-ONLY")) {
+              return true;
+          }
+      } catch (IOException e) {
+          // ignore as this is expected
+          LOG.info("server " + hp + " not up " + e);
+      }
+
+      if (Time.now() > start + timeout) {
+          break;
+      }
+      try {
+          Thread.sleep(250);
+      } catch (InterruptedException e) {
+          // ignore
+      }
     }
+    return false;
+  }
+  public static boolean waitForServerDown(String hp, long timeout) {
+    long start = Time.now();
+    while (true) {
+      try {
+          HostPort hpobj = parseHostPortList(hp).get(0);
+          send4LetterWord(hpobj.host, hpobj.port, "stat");
+      } catch (IOException e) {
+          return true;
+      }
+
+      if (Time.now() > start + timeout) {
+          break;
+      }
+      try {
+          Thread.sleep(250);
+      } catch (InterruptedException e) {
+          // ignore
+      }
+    }
+    return false;
+  }
 
-    static ServerCnxnFactory createNewServerInstance(File dataDir,
-            ServerCnxnFactory factory, String hostPort, int maxCnxns)
-        throws IOException, InterruptedException
-    {
-        ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
-        final int PORT = getPort(hostPort);
-        if (factory == null) {
-            factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
-        }
-        factory.startup(zks);
-        Assert.assertTrue("waiting for server up",
-                   ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
-                                              CONNECTION_TIMEOUT));
+  public static File createTmpDir() throws IOException {
+    return createTmpDir(BASETEST);
+  }
+  static File createTmpDir(File parentDir) throws IOException {
+    File tmpFile = File.createTempFile("test", ".junit", parentDir);
+    // don't delete tmpFile - this ensures we don't attempt to create
+    // a tmpDir with a duplicate name
+    File tmpDir = new File(tmpFile + ".dir");
+    Assert.assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
+    Assert.assertTrue(tmpDir.mkdirs());
+
+    return tmpDir;
+  }
 
-        return factory;
+  private static int getPort(String hostPort) {
+    String[] split = hostPort.split(":");
+    String portstr = split[split.length-1];
+    String[] pc = portstr.split("/");
+    if (pc.length > 1) {
+      portstr = pc[0];
     }
+    return Integer.parseInt(portstr);
+  }
 
-    static void shutdownServerInstance(ServerCnxnFactory factory,
-            String hostPort)
-    {
-        if (factory != null) {
-            ZKDatabase zkDb;
-            {
-                ZooKeeperServer zs = getServer(factory);
-        
-                zkDb = zs.getZKDatabase();
-            }
-            factory.shutdown();
-            try {
-                zkDb.close();
-            } catch (IOException ie) {
-                LOG.warn("Error closing logs ", ie);
-            }
-            final int PORT = getPort(hostPort);
-
-            Assert.assertTrue("waiting for server down",
-                       ClientBaseWithFixes.waitForServerDown("127.0.0.1:" + PORT,
-                                                    CONNECTION_TIMEOUT));
-        }
+  static ServerCnxnFactory createNewServerInstance(File dataDir,
+          ServerCnxnFactory factory, String hostPort, int maxCnxns)
+      throws IOException, InterruptedException
+  {
+    ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
+    final int PORT = getPort(hostPort);
+    if (factory == null) {
+      factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
     }
+    factory.startup(zks);
+    Assert.assertTrue("waiting for server up",
+               ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
+                                          CONNECTION_TIMEOUT));
 
-    /**
-     * Test specific setup
-     */
-    public static void setupTestEnv() {
-        // during the tests we run with 100K prealloc in the logs.
-        // on windows systems prealloc of 64M was seen to take ~15seconds
-        // resulting in test Assert.failure (client timeout on first session).
-        // set env and directly in order to handle static init/gc issues
-        System.setProperty("zookeeper.preAllocSize", "100");
-        FileTxnLog.setPreallocSize(100 * 1024);
-    }
+    return factory;
+  }
 
-    protected void setUpAll() throws Exception {
-        allClients = new LinkedList();
-        allClientsSetup = true;
+  static void shutdownServerInstance(ServerCnxnFactory factory,
+          String hostPort)
+  {
+    if (factory != null) {
+      ZKDatabase zkDb;
+      {
+        ZooKeeperServer zs = getServer(factory);
+
+        zkDb = zs.getZKDatabase();
+      }
+      factory.shutdown();
+      try {
+        zkDb.close();
+      } catch (IOException ie) {
+        LOG.warn("Error closing logs ", ie);
+      }
+      final int PORT = getPort(hostPort);
+
+      Assert.assertTrue("waiting for server down",
+                 ClientBaseWithFixes.waitForServerDown("127.0.0.1:" + PORT,
+                                              CONNECTION_TIMEOUT));
     }
+  }
 
-    @Before
-    public void setUp() throws Exception {
-        BASETEST.mkdirs();
+  /**
+   * Test specific setup
+   */
+  public static void setupTestEnv() {
+    // during the tests we run with 100K prealloc in the logs.
+    // on windows systems prealloc of 64M was seen to take ~15seconds
+    // resulting in test Assert.failure (client timeout on first session).
+    // set env and directly in order to handle static init/gc issues
+    System.setProperty("zookeeper.preAllocSize", "100");
+    FileTxnLog.setPreallocSize(100 * 1024);
+  }
 
-        setupTestEnv();
+  protected void setUpAll() throws Exception {
+    allClients = new LinkedList();
+    allClientsSetup = true;
+  }
 
-        setUpAll();
+  @Before
+  public void setUp() throws Exception {
+    BASETEST.mkdirs();
 
-        tmpDir = createTmpDir(BASETEST);
+    setupTestEnv();
 
-        startServer();
+    setUpAll();
 
-        LOG.info("Client test setup finished");
-    }
+    tmpDir = createTmpDir(BASETEST);
 
-    private String initHostPort() {
-        BASETEST.mkdirs();
-        int port = 0;
-        try {
-           port = ServerSocketUtil.getPort(port, 100);
-        } catch (IOException e) {
-           throw new RuntimeException(e);
-        }
-        return "127.0.0.1:" + port;
-    }
+    startServer();
 
-    protected void startServer() throws Exception {
-        LOG.info("STARTING server");
-        serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort, maxCnxns);
-    }
+    LOG.info("Client test setup finished");
+  }
 
-    protected void stopServer() throws Exception {
-        LOG.info("STOPPING server");
-        shutdownServerInstance(serverFactory, hostPort);
-        serverFactory = null;
+  private String initHostPort() {
+    BASETEST.mkdirs();
+    int port = 0;
+    try {
+      port = ServerSocketUtil.getPort(port, 100);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
+    return "127.0.0.1:" + port;
+  }
 
+  protected void startServer() throws Exception {
+    LOG.info("STARTING server");
+    serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort, maxCnxns);
+  }
 
-    protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
-        ZooKeeperServer zs = ServerCnxnFactoryAccessor.getZkServer(fac);
+  protected void stopServer() throws Exception {
+    LOG.info("STOPPING server");
+    shutdownServerInstance(serverFactory, hostPort);
+    serverFactory = null;
+  }
 
-        return zs;
-    }
 
-    protected void tearDownAll() throws Exception {
-        synchronized (this) {
-            if (allClients != null) for (ZooKeeper zk : allClients) {
-                try {
-                    if (zk != null)
-                        zk.close();
-                } catch (InterruptedException e) {
-                    LOG.warn("ignoring interrupt", e);
-                }
-            }
-            allClients = null;
+  protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
+    return fac.getZooKeeperServer();
+  }
+
+  protected void tearDownAll() throws Exception {
+    synchronized (this) {
+      if (allClients != null) for (ZooKeeper zk : allClients) {
+        try {
+            if (zk != null)
+                zk.close();
+        } catch (InterruptedException e) {
+            LOG.warn("ignoring interrupt", e);
         }
+      }
+      allClients = null;
     }
+  }
 
-    @After
-    public void tearDown() throws Exception {
-        LOG.info("tearDown starting");
-
-        tearDownAll();
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("tearDown starting");
 
-        stopServer();
+    tearDownAll();
 
-        if (tmpDir != null) {
-            Assert.assertTrue("delete " + tmpDir.toString(), recursiveDelete(tmpDir));
-        }
+    stopServer();
 
-        // This has to be set to null when the same instance of this class is reused between test cases
-        serverFactory = null;
+    if (tmpDir != null) {
+      Assert.assertTrue("delete " + tmpDir.toString(), recursiveDelete(tmpDir));
     }
 
-    public static boolean recursiveDelete(File d) {
-        if (d.isDirectory()) {
-            File children[] = d.listFiles();
-            for (File f : children) {
-                Assert.assertTrue("delete " + f.toString(), recursiveDelete(f));
-            }
-        }
-        return d.delete();
+    // This has to be set to null when the same instance of this class is reused between test cases
+    serverFactory = null;
+  }
+
+  public static boolean recursiveDelete(File d) {
+    if (d.isDirectory()) {
+      File children[] = d.listFiles();
+      for (File f : children) {
+        Assert.assertTrue("delete " + f.toString(), recursiveDelete(f));
+      }
     }
+    return d.delete();
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
index bdbf1d9c2c286..4dcc74b86d151 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -131,7 +132,7 @@ public void testRandomHealthAndDisconnects() throws Exception {
     long st = Time.now();
     while (Time.now() - st < runFor) {
       cluster.getTestContext().checkException();
-      serverFactory.closeAll();
+      serverFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
       Thread.sleep(50);
     }
   }
diff --git a/hadoop-common-project/hadoop-registry/pom.xml b/hadoop-common-project/hadoop-registry/pom.xml
index d7de0c49cc67a..204fbbe9da554 100644
--- a/hadoop-common-project/hadoop-registry/pom.xml
+++ b/hadoop-common-project/hadoop-registry/pom.xml
@@ -135,6 +135,17 @@
       dnsjava
     
 
+    
+      io.dropwizard.metrics
+      metrics-core
+    
+
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
+
   
 
   
diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
index 0ab4cd2f3bfac..994a2565c309a 100644
--- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
+++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
@@ -229,7 +229,7 @@ protected void serviceStart() throws Exception {
     setupSecurity();
 
     FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
-    ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime);
+    ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime, "");
 
     LOG.info("Starting Local Zookeeper service");
     factory = ServerCnxnFactory.createFactory();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
index 03ac256ace86e..fbf73e123e783 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
@@ -83,6 +83,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       test-jar
       test
     
+    
+      io.dropwizard.metrics
+      metrics-core
+      provided
+    
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
     
       org.apache.hadoop.thirdparty
       hadoop-shaded-guava
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index d0aac40646dfc..aef9bb8f35fde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -62,6 +62,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       test-jar
       test
     
+    
+      io.dropwizard.metrics
+      metrics-core
+      provided
+    
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
     
       org.apache.hadoop.thirdparty
       hadoop-shaded-guava
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index bcdfb8d1d9e24..b45d56d8d51e5 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -99,8 +99,8 @@
     ${hadoop-thirdparty-shaded-prefix}.protobuf
     ${hadoop-thirdparty-shaded-prefix}.com.google.common
 
-    3.5.6
-    4.2.0
+    3.6.3
+    5.2.0
     3.0.5
     2.1.7
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml
index 2e0781e352716..a19b87c87dc93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml
@@ -104,6 +104,17 @@
       
     
 
+    
+      io.dropwizard.metrics
+      metrics-core
+    
+
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
+
     
       org.slf4j
       slf4j-api
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 0e52269041797..91c946baba2be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -89,6 +89,16 @@
       test-jar
       test
     
+    
+      io.dropwizard.metrics
+      metrics-core
+      provided
+    
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
     
       junit
       junit
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 9e51d4ec048d9..6101467a4056e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -109,6 +109,15 @@
       org.apache.zookeeper
       zookeeper
     
+    
+      io.dropwizard.metrics
+      metrics-core
+    
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
     
       ${leveldbjni.group}
       leveldbjni-all
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index b2e46b8b6c69d..566f6bcc6839a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -217,6 +217,15 @@
       org.apache.zookeeper
       zookeeper
     
+    
+      io.dropwizard.metrics
+      metrics-core
+    
+    
+      org.xerial.snappy
+      snappy-java
+      provided
+    
     
       ${leveldbjni.group}
       leveldbjni-all