4141import org .apache .zookeeper .ZooDefs ;
4242import org .apache .zookeeper .ZooKeeper ;
4343import org .apache .zookeeper .ZooKeeper .States ;
44+ import org .apache .zookeeper .client .ZKClientConfig ;
4445import org .apache .zookeeper .data .ACL ;
4546import org .apache .zookeeper .data .Stat ;
4647import org .apache .zookeeper .proto .CreateRequest ;
4950import org .slf4j .LoggerFactory ;
5051
5152/**
52- * A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need
53- * to realize that there are two classes of requests: idempotent and non-idempotent requests. Read
54- * requests and unconditional sets and deletes are examples of idempotent requests, they can be
55- * reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its
56- * effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling,
57- * application and library writers need to keep in mind that they may need to encode information in
58- * the data or name of znodes to detect retries. A simple example is a create that uses a sequence
59- * flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception,
60- * that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the
61- * process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109
62- * was the result of the previous create, so the process actually owns both x-109 and x-111. An easy
63- * way around this is to use "x-process id-" when doing the create. If the process is using an id of
64- * 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
53+ * A zookeeper that can handle 'recoverable' errors.
54+ * <p>
55+ * To handle recoverable errors, developers need to realize that there are two classes of requests:
56+ * idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are
57+ * examples of idempotent requests, they can be reissued with the same results.
58+ * <p>
59+ * (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is
60+ * the same.) Non-idempotent requests need special handling, application and library writers need to
61+ * keep in mind that they may need to encode information in the data or name of znodes to detect
62+ * retries. A simple example is a create that uses a sequence flag. If a process issues a
63+ * create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue
64+ * another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
65+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result
66+ * of the previous create, so the process actually owns both x-109 and x-111. An easy way around
67+ * this is to use "x-process id-" when doing the create. If the process is using an id of 352,
68+ * before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
6569 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it
6670 * created is "x-352-109".
6771 * @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling"
@@ -79,37 +83,31 @@ public class RecoverableZooKeeper {
7983 private final int sessionTimeout ;
8084 private final String quorumServers ;
8185 private final int maxMultiSize ;
86+ private final ZKClientConfig zkClientConfig ;
8287
8388 /**
84- * See {@link #connect(Configuration, String, Watcher, String)}
89+ * See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}.
8590 */
8691 public static RecoverableZooKeeper connect (Configuration conf , Watcher watcher )
8792 throws IOException {
8893 String ensemble = ZKConfig .getZKQuorumServersString (conf );
89- return connect (conf , ensemble , watcher );
90- }
91-
92- /**
93- * See {@link #connect(Configuration, String, Watcher, String)}
94- */
95- public static RecoverableZooKeeper connect (Configuration conf , String ensemble , Watcher watcher )
96- throws IOException {
97- return connect (conf , ensemble , watcher , null );
94+ return connect (conf , ensemble , watcher , null , null );
9895 }
9996
10097 /**
10198 * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
10299 * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
103100 * watcher to the specified watcher.
104- * @param conf configuration to pull ensemble and other settings from
105- * @param watcher watcher to monitor connection changes
106- * @param ensemble ZooKeeper servers quorum string
107- * @param identifier value used to identify this client instance.
101+ * @param conf configuration to pull ensemble and other settings from
102+ * @param watcher watcher to monitor connection changes
103+ * @param ensemble ZooKeeper servers quorum string
104+ * @param identifier value used to identify this client instance.
105+ * @param zkClientConfig client specific configurations for this instance
108106 * @return connection to zookeeper
109107 * @throws IOException if unable to connect to zk or config problem
110108 */
111109 public static RecoverableZooKeeper connect (Configuration conf , String ensemble , Watcher watcher ,
112- final String identifier ) throws IOException {
110+ final String identifier , ZKClientConfig zkClientConfig ) throws IOException {
113111 if (ensemble == null ) {
114112 throw new IOException ("Unable to determine ZooKeeper ensemble" );
115113 }
@@ -122,14 +120,12 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
122120 int maxSleepTime = conf .getInt ("zookeeper.recovery.retry.maxsleeptime" , 60000 );
123121 int multiMaxSize = conf .getInt ("zookeeper.multi.max.size" , 1024 * 1024 );
124122 return new RecoverableZooKeeper (ensemble , timeout , watcher , retry , retryIntervalMillis ,
125- maxSleepTime , identifier , multiMaxSize );
123+ maxSleepTime , identifier , multiMaxSize , zkClientConfig );
126124 }
127125
128- @ edu .umd .cs .findbugs .annotations .SuppressWarnings (value = "DE_MIGHT_IGNORE" ,
129- justification = "None. Its always been this way." )
130- public RecoverableZooKeeper (String quorumServers , int sessionTimeout , Watcher watcher ,
131- int maxRetries , int retryIntervalMillis , int maxSleepTime , String identifier , int maxMultiSize )
132- throws IOException {
126+ RecoverableZooKeeper (String quorumServers , int sessionTimeout , Watcher watcher , int maxRetries ,
127+ int retryIntervalMillis , int maxSleepTime , String identifier , int maxMultiSize ,
128+ ZKClientConfig zkClientConfig ) throws IOException {
133129 // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
134130 this .retryCounterFactory =
135131 new RetryCounterFactory (maxRetries + 1 , retryIntervalMillis , maxSleepTime );
@@ -147,12 +143,7 @@ public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher wa
147143 this .sessionTimeout = sessionTimeout ;
148144 this .quorumServers = quorumServers ;
149145 this .maxMultiSize = maxMultiSize ;
150-
151- try {
152- checkZk ();
153- } catch (Exception x ) {
154- /* ignore */
155- }
146+ this .zkClientConfig = zkClientConfig ;
156147 }
157148
158149 /**
@@ -171,10 +162,10 @@ public int getMaxMultiSizeLimit() {
171162 * @return The created ZooKeeper connection object
172163 * @throws KeeperException if a ZooKeeper operation fails
173164 */
174- protected synchronized ZooKeeper checkZk () throws KeeperException {
165+ private synchronized ZooKeeper checkZk () throws KeeperException {
175166 if (this .zk == null ) {
176167 try {
177- this .zk = new ZooKeeper (quorumServers , sessionTimeout , watcher );
168+ this .zk = new ZooKeeper (quorumServers , sessionTimeout , watcher , zkClientConfig );
178169 } catch (IOException ex ) {
179170 LOG .warn ("Unable to create ZooKeeper Connection" , ex );
180171 throw new KeeperException .OperationTimeoutException ();
0 commit comments