Skip to content

Commit 2c7b6cf

Browse files
committed
gbn: add client side resend loop protection
This commit ensures that after we have resent the queue, we will wait until we know that both parties are in sync before we continue to send new packets. This ensures that we don't end up in an indefinitely resend loop due to latency and delayed NACKs by the other party, which could happen prior to this commit. To understand why we need to await the awaited ACK/NACK after resending the queue, consider the following scenario: 1. Alice sends packets 1, 2, 3 & 4 to Bob. 2. Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs. 3. Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for packets 3 & 4 are delayed and aren't received until Alice resend timeout has passed, which leads to Alice resending packets 3 & 4. Alice will after that receive the delayed ACKs for packets 3 & 4, but will consider that as the ACKs for the resent packets, and not the original packets which they were actually sent for. If we didn't wait after resending the queue, Alice would then proceed to send more packets (5 & 6). 4. When Bob receives the resent packets 3 & 4, Bob will respond with NACK 5. Due to latency, the packets 5 & 6 that Alice sent in step (3) above will then be received by Bob, and be processed as the correct response to the NACK 5. Bob will after that await packet 7. 5. Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob is now awaiting packet 7, this send will lead to a NACK 7. But due to latency, if Alice doesn't wait resending the queue, Alice will proceed to send new packet(s) before receiving the NACK 7. 6. This resend loop would continue indefinitely, so we need to ensure that Alice waits after she has resent the queue, to ensure that she doesn't proceed to send new packets before she is sure that both parties are in sync. To ensure that we are in sync, after we have resent the queue, we will await that we either: 1. Receive a NACK for the sequence number succeeding the last packet in the resent queue i.e. in step (3) above, that would be NACK 5. OR 2. Receive an ACK for the last packet in the resent queue i.e. in step (3) above, that would be ACK 4. After we receive the expected ACK, we will then wait for the duration of the resend timeout before continuing. The reason why we wait for the resend timeout before continuing, is that the ACKs we are getting after a resend, could be delayed ACKs for the original packets we sent, and not ACKs for the resent packets. In step (3) above, the ACKs for packets 3 & 4 that Alice received were delayed ACKs for the original packets. If Alice would have immediately continued to send new packets (5 & 6) after receiving the ACK 4, she would have then received the NACK 5 from Bob which was the actual response to the resent queue. But as Alice had already continued to send packets 5 & 6 when receiving the NACK 5, the resend queue response to that NACK would cause the resend loop to continue indefinitely. When either of the 2 conditions above are met, we will consider both parties to be in sync, and we can proceed to send new packets.
1 parent 9db0d53 commit 2c7b6cf

File tree

5 files changed

+405
-68
lines changed

5 files changed

+405
-68
lines changed

gbn/gbn_client.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,7 @@ func NewClientConn(ctx context.Context, n uint8, sendFunc sendBytesFunc,
2121
math.MaxUint8)
2222
}
2323

24-
conn := newGoBackNConn(ctx, sendFunc, receiveFunc, false, n)
25-
26-
// Apply functional options
27-
for _, o := range opts {
28-
o(conn)
29-
}
24+
conn := newGoBackNConn(ctx, sendFunc, receiveFunc, false, n, opts...)
3025

3126
if err := conn.clientHandshake(); err != nil {
3227
if err := conn.Close(); err != nil {

gbn/gbn_conn.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -113,21 +113,21 @@ type GoBackNConn struct {
113113

114114
// newGoBackNConn creates a GoBackNConn instance with all the members which
115115
// are common between client and server initialised.
116+
//
117+
//nolint:varnamelen
118+
//nolint:gofumpt
116119
func newGoBackNConn(ctx context.Context, sendFunc sendBytesFunc,
117-
recvFunc recvBytesFunc, isServer bool, n uint8) *GoBackNConn {
120+
recvFunc recvBytesFunc, isServer bool, n uint8,
121+
opts ...Option) *GoBackNConn {
118122

119123
ctxc, cancel := context.WithCancel(ctx)
120124

121-
return &GoBackNConn{
122-
n: n,
123-
s: n + 1,
125+
gbn := &GoBackNConn{
124126
resendTimeout: defaultResendTimeout,
125127
recvFromStream: recvFunc,
126128
sendToStream: sendFunc,
127-
recvDataChan: make(chan *PacketData, n),
128129
sendDataChan: make(chan *PacketData),
129130
isServer: isServer,
130-
sendQueue: newQueue(n+1, defaultHandshakeTimeout),
131131
handshakeTimeout: defaultHandshakeTimeout,
132132
recvTimeout: DefaultRecvTimeout,
133133
sendTimeout: DefaultSendTimeout,
@@ -138,6 +138,14 @@ func newGoBackNConn(ctx context.Context, sendFunc sendBytesFunc,
138138
cancel: cancel,
139139
quit: make(chan struct{}),
140140
}
141+
142+
for _, o := range opts {
143+
o(gbn)
144+
}
145+
146+
gbn.setN(n)
147+
148+
return gbn
141149
}
142150

143151
// setN sets the current N to use. This _must_ be set before the handshake is
@@ -146,7 +154,12 @@ func (g *GoBackNConn) setN(n uint8) {
146154
g.n = n
147155
g.s = n + 1
148156
g.recvDataChan = make(chan *PacketData, n)
149-
g.sendQueue = newQueue(n+1, defaultHandshakeTimeout)
157+
g.sendQueue = newQueue(&queueConfig{
158+
s: g.s, //nolint:gci
159+
sendPkt: func(packet *PacketData) error {
160+
return g.sendPacket(g.ctx, packet)
161+
},
162+
})
150163
}
151164

152165
// SetSendTimeout sets the timeout used in the Send function.
@@ -348,6 +361,8 @@ func (g *GoBackNConn) Close() error {
348361
// initialisation.
349362
g.cancel()
350363

364+
g.sendQueue.stop()
365+
351366
g.wg.Wait()
352367

353368
if g.pingTicker != nil {
@@ -387,9 +402,17 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
387402
func (g *GoBackNConn) sendPacketsForever() error {
388403
// resendQueue re-sends the current contents of the queue.
389404
resendQueue := func() error {
390-
return g.sendQueue.resend(func(packet *PacketData) error {
391-
return g.sendPacket(g.ctx, packet)
392-
})
405+
err := g.sendQueue.resend(g.resendTimeout)
406+
407+
// After resending the queue, we reset the resend ticker.
408+
// This is so that we don't immediately resend the queue again,
409+
// if the sendQueue.resend call above took a long time to
410+
// execute. That can happen if the function was awaiting the
411+
// expected ACK for a long time, or times out while awaiting the
412+
// catch up.
413+
g.resendTicker.Reset(g.resendTimeout)
414+
415+
return err
393416
}
394417

395418
for {
@@ -578,7 +601,10 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
578601
}
579602

580603
case *PacketACK:
581-
gotValidACK := g.sendQueue.processACK(m.Seq)
604+
gotValidACK := g.sendQueue.processACK(
605+
m.Seq, g.resendTimeout,
606+
)
607+
582608
if gotValidACK {
583609
g.resendTicker.Reset(g.resendTimeout)
584610

@@ -597,15 +623,12 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
597623
// sent was dropped, or maybe we sent a duplicate
598624
// message. The NACK message contains the sequence
599625
// number that the receiver was expecting.
600-
inQueue, bumped := g.sendQueue.processNACK(m.Seq)
601-
602-
// If the NACK sequence number is not in our queue
603-
// then we ignore it. We must have received the ACK
604-
// for the sequence number in the meantime.
605-
if !inQueue {
606-
log.Tracef("NACK seq %d is not in the queue. "+
607-
"Ignoring. (isServer=%v)", m.Seq,
608-
g.isServer)
626+
shouldResend, bumped := g.sendQueue.processNACK(m.Seq)
627+
628+
// If we don't need to resend the queue after processing
629+
// the NACK, we can continue without sending the resend
630+
// signal.
631+
if !shouldResend {
609632
continue
610633
}
611634

gbn/gbn_server.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@ import (
1414
func NewServerConn(ctx context.Context, sendFunc sendBytesFunc,
1515
recvFunc recvBytesFunc, opts ...Option) (*GoBackNConn, error) {
1616

17-
conn := newGoBackNConn(ctx, sendFunc, recvFunc, true, DefaultN)
18-
19-
// Apply functional options
20-
for _, o := range opts {
21-
o(conn)
22-
}
17+
conn := newGoBackNConn(ctx, sendFunc, recvFunc, true, DefaultN, opts...)
2318

2419
if err := conn.serverHandshake(); err != nil {
2520
if err := conn.Close(); err != nil {

0 commit comments

Comments
 (0)