2424import java .util .ArrayList ;
2525import java .util .Collection ;
2626import java .util .Collections ;
27- import java .util .HashMap ;
2827import java .util .List ;
2928import java .util .Map ;
3029import java .util .Set ;
3130import java .util .TreeMap ;
3231import java .util .UUID ;
3332import java .util .concurrent .ConcurrentHashMap ;
34- import java .util .concurrent .PriorityBlockingQueue ;
3533import java .util .concurrent .TimeUnit ;
3634import java .util .concurrent .TimeoutException ;
3735import java .util .concurrent .atomic .AtomicBoolean ;
8583public class ReplicationSource implements ReplicationSourceInterface {
8684
8785 private static final Logger LOG = LoggerFactory .getLogger (ReplicationSource .class );
88- // Queues of logs to process, entry in format of walGroupId->queue,
89- // each presents a queue for one wal group
90- private Map <String , PriorityBlockingQueue <Path >> queues = new HashMap <>();
9186 // per group queue size, keep no more than this number of logs in each wal group
9287 protected int queueSizePerGroup ;
88+ protected ReplicationSourceLogQueue logQueue ;
9389 protected ReplicationQueueStorage queueStorage ;
9490 protected ReplicationPeer replicationPeer ;
9591
@@ -115,8 +111,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
115111 volatile boolean sourceRunning = false ;
116112 // Metrics for this source
117113 private MetricsSource metrics ;
118- // WARN threshold for the number of queued logs, defaults to 2
119- private int logQueueWarnThreshold ;
120114 // ReplicationEndpoint which will handle the actual replication
121115 private volatile ReplicationEndpoint replicationEndpoint ;
122116
@@ -210,6 +204,7 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
210204 this .maxRetriesMultiplier =
211205 this .conf .getInt ("replication.source.maxretriesmultiplier" , 300 ); // 5 minutes @ 1 sec per
212206 this .queueSizePerGroup = this .conf .getInt ("hbase.regionserver.maxlogs" , 32 );
207+ this .logQueue = new ReplicationSourceLogQueue (conf , metrics , this );
213208 this .queueStorage = queueStorage ;
214209 this .replicationPeer = replicationPeer ;
215210 this .manager = manager ;
@@ -219,7 +214,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
219214
220215 this .queueId = queueId ;
221216 this .replicationQueueInfo = new ReplicationQueueInfo (queueId );
222- this .logQueueWarnThreshold = this .conf .getInt ("replication.source.log.queue.warn" , 2 );
223217
224218 // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
225219 defaultBandwidth = this .conf .getLong ("replication.source.per.peer.node.bandwidth" , 0 );
@@ -250,35 +244,20 @@ public void enqueueLog(Path wal) {
250244 }
251245 // Use WAL prefix as the WALGroupId for this peer.
252246 String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal .getName ());
253- PriorityBlockingQueue <Path > queue = queues .get (walPrefix );
254- if (queue == null ) {
255- queue = new PriorityBlockingQueue <>(queueSizePerGroup ,
256- new AbstractFSWALProvider .WALStartTimeComparator ());
257- // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
258- // the shipper may quit immediately
259- queue .put (wal );
260- queues .put (walPrefix , queue );
247+ boolean queueExists = logQueue .enqueueLog (wal , walPrefix );
248+
249+ if (!queueExists ) {
261250 if (this .isSourceActive () && this .walEntryFilter != null ) {
262251 // new wal group observed after source startup, start a new worker thread to track it
263252 // notice: it's possible that wal enqueued when this.running is set but worker thread
264253 // still not launched, so it's necessary to check workerThreads before start the worker
265- tryStartNewShipper (walPrefix , queue );
254+ tryStartNewShipper (walPrefix );
266255 }
267- } else {
268- queue .put (wal );
269256 }
270257 if (LOG .isTraceEnabled ()) {
271258 LOG .trace ("{} Added wal {} to queue of source {}." , logPeerId (), walPrefix ,
272259 this .replicationQueueInfo .getQueueId ());
273260 }
274- this .metrics .incrSizeOfLogQueue ();
275- // This will wal a warning for each new wal that gets created above the warn threshold
276- int queueSize = queue .size ();
277- if (queueSize > this .logQueueWarnThreshold ) {
278- LOG .warn ("{} WAL group {} queue size: {} exceeds value of " +
279- "replication.source.log.queue.warn {}" , logPeerId (), walPrefix , queueSize ,
280- logQueueWarnThreshold );
281- }
282261 }
283262
284263 @ Override
@@ -370,16 +349,16 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
370349 this .walEntryFilter = new ChainWALEntryFilter (filters );
371350 }
372351
373- private void tryStartNewShipper (String walGroupId , PriorityBlockingQueue < Path > queue ) {
352+ private void tryStartNewShipper (String walGroupId ) {
374353 workerThreads .compute (walGroupId , (key , value ) -> {
375354 if (value != null ) {
376355 LOG .debug ("{} preempted start of shipping worker walGroupId={}" , logPeerId (), walGroupId );
377356 return value ;
378357 } else {
379358 LOG .debug ("{} starting shipping worker for walGroupId={}" , logPeerId (), walGroupId );
380- ReplicationSourceShipper worker = createNewShipper (walGroupId , queue );
359+ ReplicationSourceShipper worker = createNewShipper (walGroupId );
381360 ReplicationSourceWALReader walReader =
382- createNewWALReader (walGroupId , queue , worker .getStartPosition ());
361+ createNewWALReader (walGroupId , worker .getStartPosition ());
383362 Threads .setDaemonThreadRunning (
384363 walReader , Thread .currentThread ().getName ()
385364 + ".replicationSource.wal-reader." + walGroupId + "," + queueId ,
@@ -399,7 +378,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
399378 String walGroupId = walGroupShipper .getKey ();
400379 ReplicationSourceShipper shipper = walGroupShipper .getValue ();
401380 ageOfLastShippedOp = metrics .getAgeOfLastShippedOp (walGroupId );
402- int queueSize = queues . get (walGroupId ). size ( );
381+ int queueSize = logQueue . getQueueSize (walGroupId );
403382 replicationDelay = metrics .getReplicationDelay ();
404383 Path currentPath = shipper .getCurrentPath ();
405384 fileSize = -1 ;
@@ -438,16 +417,16 @@ private long getFileSize(Path currentPath) throws IOException {
438417 return fileSize ;
439418 }
440419
441- protected ReplicationSourceShipper createNewShipper (String walGroupId ,
442- PriorityBlockingQueue <Path > queue ) {
443- return new ReplicationSourceShipper (conf , walGroupId , queue , this );
420+ protected ReplicationSourceShipper createNewShipper (String walGroupId ) {
421+ return new ReplicationSourceShipper (conf , walGroupId , logQueue , this );
444422 }
445423
446- private ReplicationSourceWALReader createNewWALReader (String walGroupId ,
447- PriorityBlockingQueue <Path > queue , long startPosition ) {
424+ private ReplicationSourceWALReader createNewWALReader (String walGroupId , long startPosition ) {
448425 return replicationPeer .getPeerConfig ().isSerial ()
449- ? new SerialReplicationSourceWALReader (fs , conf , queue , startPosition , walEntryFilter , this )
450- : new ReplicationSourceWALReader (fs , conf , queue , startPosition , walEntryFilter , this );
426+ ? new SerialReplicationSourceWALReader (fs , conf , logQueue , startPosition , walEntryFilter ,
427+ this , walGroupId )
428+ : new ReplicationSourceWALReader (fs , conf , logQueue , startPosition , walEntryFilter ,
429+ this , walGroupId );
451430 }
452431
453432 /**
@@ -607,14 +586,12 @@ private void initialize() {
607586 throw new IllegalStateException ("Source should be active." );
608587 }
609588 LOG .info ("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}" ,
610- logPeerId (), this .replicationQueueInfo .getQueueId (), this . queues . size (), clusterId ,
589+ logPeerId (), this .replicationQueueInfo .getQueueId (), logQueue . getNumQueues (), clusterId ,
611590 peerClusterId );
612591 initializeWALEntryFilter (peerClusterId );
613592 // Start workers
614- for (Map .Entry <String , PriorityBlockingQueue <Path >> entry : queues .entrySet ()) {
615- String walGroupId = entry .getKey ();
616- PriorityBlockingQueue <Path > queue = entry .getValue ();
617- tryStartNewShipper (walGroupId , queue );
593+ for (String walGroupId : logQueue .getQueues ().keySet ()) {
594+ tryStartNewShipper (walGroupId );
618595 }
619596 this .startupOngoing .set (false );
620597 }
@@ -844,7 +821,7 @@ void removeWorker(ReplicationSourceShipper worker) {
844821 workerThreads .remove (worker .walGroupId , worker );
845822 }
846823
847- private String logPeerId (){
824+ public String logPeerId (){
848825 return "peerId=" + this .getPeerId () + "," ;
849826 }
850827}
0 commit comments