@@ -403,38 +403,44 @@ public void drainSources(String peerId) throws IOException, ReplicationException
403403    // TODO: use empty initial offsets for now, revisit when adding support for sync replication 
404404    ReplicationSourceInterface  src  =
405405      createSource (new  ReplicationQueueData (queueId , ImmutableMap .of ()), peer );
406-     // synchronized here to avoid race with preLogRoll  where we add new log to source and also 
406+     // synchronized here to avoid race with postLogRoll  where we add new log to source and also 
407407    // walsById. 
408408    ReplicationSourceInterface  toRemove ;
409-     Map < String ,  NavigableSet < String >>  wals  =  new   HashMap <>() ;
409+     ReplicationQueueData   queueData ;
410410    synchronized  (latestPaths ) {
411+       // Here we make a copy of all the remaining wal files and then delete them from the 
412+       // replication queue storage after releasing the lock. It is not safe to just remove the old 
413+       // map from walsById since later we may fail to update the replication queue storage, and when 
414+       // we retry next time, we can not know the wal files that needs to be set to the replication 
415+       // queue storage 
416+       ImmutableMap .Builder <String , ReplicationGroupOffset > builder  = ImmutableMap .builder ();
417+       synchronized  (walsById ) {
418+         walsById .get (queueId ).forEach ((group , wals ) -> {
419+           if  (!wals .isEmpty ()) {
420+             builder .put (group , new  ReplicationGroupOffset (wals .last (), -1 ));
421+           }
422+         });
423+       }
424+       queueData  = new  ReplicationQueueData (queueId , builder .build ());
425+       src  = createSource (queueData , peer );
411426      toRemove  = sources .put (peerId , src );
412427      if  (toRemove  != null ) {
413428        LOG .info ("Terminate replication source for "  + toRemove .getPeerId ());
414429        toRemove .terminate (terminateMessage );
415430        toRemove .getSourceMetrics ().clear ();
416431      }
417-       // Here we make a copy of all the remaining wal files and then delete them from the 
418-       // replication queue storage after releasing the lock. It is not safe to just remove the old 
419-       // map from walsById since later we may fail to delete them from the replication queue 
420-       // storage, and when we retry next time, we can not know the wal files that need to be deleted 
421-       // from the replication queue storage. 
422-       walsById .get (queueId ).forEach ((k , v ) -> wals .put (k , new  TreeSet <>(v )));
432+     }
433+     for  (Map .Entry <String , ReplicationGroupOffset > entry  : queueData .getOffsets ().entrySet ()) {
434+       queueStorage .setOffset (queueId , entry .getKey (), entry .getValue (), Collections .emptyMap ());
423435    }
424436    LOG .info ("Startup replication source for "  + src .getPeerId ());
425437    src .startup ();
426-     for  (NavigableSet <String > walsByGroup  : wals .values ()) {
427-       // TODO: just need to reset the replication offset 
428-       // for (String wal : walsByGroup) { 
429-       // queueStorage.removeWAL(server.getServerName(), peerId, wal); 
430-       // } 
431-     }
432438    synchronized  (walsById ) {
433-       Map <String , NavigableSet <String >> oldWals  = walsById .get (queueId );
434-       wals . forEach ((k ,  v ) -> {
435-         NavigableSet <String > walsByGroup  = oldWals .get (k );
439+       Map <String , NavigableSet <String >> wals  = walsById .get (queueId );
440+       queueData . getOffsets (). forEach ((group ,  offset ) -> {
441+         NavigableSet <String > walsByGroup  = wals .get (group );
436442        if  (walsByGroup  != null ) {
437-           walsByGroup .removeAll ( v );
443+           walsByGroup .headSet ( offset . getWal (),  true ). clear ( );
438444        }
439445      });
440446    }
@@ -457,13 +463,8 @@ public void drainSources(String peerId) throws IOException, ReplicationException
457463  }
458464
459465  private  ReplicationSourceInterface  createRefreshedSource (ReplicationQueueId  queueId ,
460-     ReplicationPeer  peer ) throws  IOException  {
461-     Map <String , ReplicationGroupOffset > offsets ;
462-     try  {
463-       offsets  = queueStorage .getOffsets (queueId );
464-     } catch  (ReplicationException  e ) {
465-       throw  new  IOException (e );
466-     }
466+     ReplicationPeer  peer ) throws  IOException , ReplicationException  {
467+     Map <String , ReplicationGroupOffset > offsets  = queueStorage .getOffsets (queueId );
467468    return  createSource (new  ReplicationQueueData (queueId , ImmutableMap .copyOf (offsets )), peer );
468469  }
469470
@@ -473,7 +474,7 @@ private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queu
473474   * replication queue storage and only to enqueue all logs to the new replication source 
474475   * @param peerId the id of the replication peer 
475476   */ 
476-   public  void  refreshSources (String  peerId ) throws  IOException  {
477+   public  void  refreshSources (String  peerId ) throws  ReplicationException ,  IOException  {
477478    String  terminateMessage  = "Peer "  + peerId 
478479      + " state or config changed. Will close the previous replication source and open a new one" ;
479480    ReplicationPeer  peer  = replicationPeers .getPeer (peerId );
0 commit comments