@@ -89,13 +89,10 @@ func newGoBackNConn(ctx context.Context, cfg *config,
8989 prefix := fmt .Sprintf ("(%s)" , loggerPrefix )
9090 plog := build .NewPrefixLog (prefix , log )
9191
92- return & GoBackNConn {
93- cfg : cfg ,
94- recvDataChan : make (chan * PacketData , cfg .n ),
95- sendDataChan : make (chan * PacketData ),
96- sendQueue : newQueue (
97- cfg .n + 1 , defaultHandshakeTimeout , plog ,
98- ),
92+ g := & GoBackNConn {
93+ cfg : cfg ,
94+ recvDataChan : make (chan * PacketData , cfg .n ),
95+ sendDataChan : make (chan * PacketData ),
9996 recvTimeout : DefaultRecvTimeout ,
10097 sendTimeout : DefaultSendTimeout ,
10198 receivedACKSignal : make (chan struct {}),
@@ -106,6 +103,17 @@ func newGoBackNConn(ctx context.Context, cfg *config,
106103 log : plog ,
107104 quit : make (chan struct {}),
108105 }
106+
107+ g .sendQueue = newQueue (& queueCfg {
108+ s : cfg .n + 1 ,
109+ timeout : cfg .resendTimeout ,
110+ log : plog ,
111+ sendPkt : func (packet * PacketData ) error {
112+ return g .sendPacket (g .ctx , packet )
113+ },
114+ })
115+
116+ return g
109117}
110118
111119// setN sets the current N to use. This _must_ be set before the handshake is
@@ -114,7 +122,14 @@ func (g *GoBackNConn) setN(n uint8) {
114122 g .cfg .n = n
115123 g .cfg .s = n + 1
116124 g .recvDataChan = make (chan * PacketData , n )
117- g .sendQueue = newQueue (n + 1 , defaultHandshakeTimeout , g .log )
125+ g .sendQueue = newQueue (& queueCfg {
126+ s : n + 1 ,
127+ timeout : g .cfg .resendTimeout ,
128+ log : g .log ,
129+ sendPkt : func (packet * PacketData ) error {
130+ return g .sendPacket (g .ctx , packet )
131+ },
132+ })
118133}
119134
120135// SetSendTimeout sets the timeout used in the Send function.
@@ -320,6 +335,8 @@ func (g *GoBackNConn) Close() error {
320335 // initialisation.
321336 g .cancel ()
322337
338+ g .sendQueue .stop ()
339+
323340 g .wg .Wait ()
324341
325342 if g .pingTicker != nil {
@@ -359,9 +376,28 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
359376func (g * GoBackNConn ) sendPacketsForever () error {
360377 // resendQueue re-sends the current contents of the queue.
361378 resendQueue := func () error {
362- return g .sendQueue .resend (func (packet * PacketData ) error {
363- return g .sendPacket (g .ctx , packet )
364- })
379+ err := g .sendQueue .resend ()
380+ if err != nil {
381+ return err
382+ }
383+
384+ // After resending the queue, we reset the resend ticker.
385+ // This is so that we don't immediately resend the queue again,
386+ // if the sendQueue.resend call above took a long time to
387+ // execute. That can happen if the function was awaiting the
388+ // expected ACK for a long time, or times out while awaiting the
389+ // catch up.
390+ g .resendTicker .Reset (g .cfg .resendTimeout )
391+
392+ // Also drain the resend signal channel, as resendTicker.Reset
393+ // doesn't drain the channel if the ticker ticked during the
394+ // sendQueue.resend() call above.
395+ select {
396+ case <- g .resendTicker .C :
397+ default :
398+ }
399+
400+ return nil
365401 }
366402
367403 for {
@@ -478,6 +514,8 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
478514 g .pongTicker .Pause ()
479515 }
480516
517+ g .resendTicker .Reset (g .cfg .resendTimeout )
518+
481519 switch m := msg .(type ) {
482520 case * PacketData :
483521 switch m .Seq == g .recvSeq {
@@ -526,9 +564,19 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
526564
527565 // If we recently sent a NACK for the same
528566 // sequence number then back off.
529- if lastNackSeq == g .recvSeq &&
530- time .Since (lastNackTime ) <
531- g .cfg .resendTimeout {
567+ // We wait 2 times the resendTimeout before
568+ // sending a new nack, as this case is likely
569+ // hit if the sender is currently resending
570+ // the queue, and therefore the threads that
571+ // are resending the queue is likely busy with
572+ // the resend, and therefore won't react to the
573+ // NACK we send here in time.
574+ sinceSent := time .Since (lastNackTime )
575+ recentlySent := sinceSent <
576+ g .cfg .resendTimeout * 2
577+
578+ if lastNackSeq == g .recvSeq && recentlySent {
579+ g .log .Tracef ("Recently sent NACK" )
532580
533581 continue
534582 }
@@ -552,8 +600,6 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
552600 case * PacketACK :
553601 gotValidACK := g .sendQueue .processACK (m .Seq )
554602 if gotValidACK {
555- g .resendTicker .Reset (g .cfg .resendTimeout )
556-
557603 // Send a signal to indicate that new
558604 // ACKs have been received.
559605 select {
@@ -569,15 +615,12 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
569615 // sent was dropped, or maybe we sent a duplicate
570616 // message. The NACK message contains the sequence
571617 // number that the receiver was expecting.
572- inQueue , bumped := g .sendQueue .processNACK (m .Seq )
573-
574- // If the NACK sequence number is not in our queue
575- // then we ignore it. We must have received the ACK
576- // for the sequence number in the meantime.
577- if ! inQueue {
578- g .log .Tracef ("NACK seq %d is not in the " +
579- "queue. Ignoring" , m .Seq )
618+ shouldResend , bumped := g .sendQueue .processNACK (m .Seq )
580619
620+ // If we don't need to resend the queue after processing
621+ // the NACK, we can continue without sending the resend
622+ // signal.
623+ if ! shouldResend {
581624 continue
582625 }
583626
@@ -606,6 +649,10 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
606649
607650 close (g .remoteClosed )
608651
652+ if g .cfg .onFIN != nil {
653+ g .cfg .onFIN ()
654+ }
655+
609656 return errTransportClosing
610657
611658 default :
0 commit comments