@@ -403,38 +403,44 @@ public void drainSources(String peerId) throws IOException, ReplicationException
403403 // TODO: use empty initial offsets for now, revisit when adding support for sync replication
404404 ReplicationSourceInterface src =
405405 createSource (new ReplicationQueueData (queueId , ImmutableMap .of ()), peer );
406- // synchronized here to avoid race with preLogRoll where we add new log to source and also
406+ // synchronized here to avoid race with postLogRoll where we add new log to source and also
407407 // walsById.
408408 ReplicationSourceInterface toRemove ;
409- Map < String , NavigableSet < String >> wals = new HashMap <>() ;
409+ ReplicationQueueData queueData ;
410410 synchronized (latestPaths ) {
411+ // Here we make a copy of all the remaining wal files and then delete them from the
412+ // replication queue storage after releasing the lock. It is not safe to just remove the old
413+ // map from walsById since later we may fail to update the replication queue storage, and when
414+ // we retry next time, we can not know the wal files that needs to be set to the replication
415+ // queue storage
416+ ImmutableMap .Builder <String , ReplicationGroupOffset > builder = ImmutableMap .builder ();
417+ synchronized (walsById ) {
418+ walsById .get (queueId ).forEach ((group , wals ) -> {
419+ if (!wals .isEmpty ()) {
420+ builder .put (group , new ReplicationGroupOffset (wals .last (), -1 ));
421+ }
422+ });
423+ }
424+ queueData = new ReplicationQueueData (queueId , builder .build ());
425+ src = createSource (queueData , peer );
411426 toRemove = sources .put (peerId , src );
412427 if (toRemove != null ) {
413428 LOG .info ("Terminate replication source for " + toRemove .getPeerId ());
414429 toRemove .terminate (terminateMessage );
415430 toRemove .getSourceMetrics ().clear ();
416431 }
417- // Here we make a copy of all the remaining wal files and then delete them from the
418- // replication queue storage after releasing the lock. It is not safe to just remove the old
419- // map from walsById since later we may fail to delete them from the replication queue
420- // storage, and when we retry next time, we can not know the wal files that need to be deleted
421- // from the replication queue storage.
422- walsById .get (queueId ).forEach ((k , v ) -> wals .put (k , new TreeSet <>(v )));
432+ }
433+ for (Map .Entry <String , ReplicationGroupOffset > entry : queueData .getOffsets ().entrySet ()) {
434+ queueStorage .setOffset (queueId , entry .getKey (), entry .getValue (), Collections .emptyMap ());
423435 }
424436 LOG .info ("Startup replication source for " + src .getPeerId ());
425437 src .startup ();
426- for (NavigableSet <String > walsByGroup : wals .values ()) {
427- // TODO: just need to reset the replication offset
428- // for (String wal : walsByGroup) {
429- // queueStorage.removeWAL(server.getServerName(), peerId, wal);
430- // }
431- }
432438 synchronized (walsById ) {
433- Map <String , NavigableSet <String >> oldWals = walsById .get (queueId );
434- wals . forEach ((k , v ) -> {
435- NavigableSet <String > walsByGroup = oldWals .get (k );
439+ Map <String , NavigableSet <String >> wals = walsById .get (queueId );
440+ queueData . getOffsets (). forEach ((group , offset ) -> {
441+ NavigableSet <String > walsByGroup = wals .get (group );
436442 if (walsByGroup != null ) {
437- walsByGroup .removeAll ( v );
443+ walsByGroup .headSet ( offset . getWal (), true ). clear ( );
438444 }
439445 });
440446 }
@@ -457,13 +463,8 @@ public void drainSources(String peerId) throws IOException, ReplicationException
457463 }
458464
459465 private ReplicationSourceInterface createRefreshedSource (ReplicationQueueId queueId ,
460- ReplicationPeer peer ) throws IOException {
461- Map <String , ReplicationGroupOffset > offsets ;
462- try {
463- offsets = queueStorage .getOffsets (queueId );
464- } catch (ReplicationException e ) {
465- throw new IOException (e );
466- }
466+ ReplicationPeer peer ) throws IOException , ReplicationException {
467+ Map <String , ReplicationGroupOffset > offsets = queueStorage .getOffsets (queueId );
467468 return createSource (new ReplicationQueueData (queueId , ImmutableMap .copyOf (offsets )), peer );
468469 }
469470
@@ -473,7 +474,7 @@ private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queu
473474 * replication queue storage and only to enqueue all logs to the new replication source
474475 * @param peerId the id of the replication peer
475476 */
476- public void refreshSources (String peerId ) throws IOException {
477+ public void refreshSources (String peerId ) throws ReplicationException , IOException {
477478 String terminateMessage = "Peer " + peerId
478479 + " state or config changed. Will close the previous replication source and open a new one" ;
479480 ReplicationPeer peer = replicationPeers .getPeer (peerId );
0 commit comments