Skip to content

Commit c21b195

Browse files
committed
gbn: add timeout manager
To make the gbn package more modular, we move the responsibility of managing the timeout values to a new struct, the `TimeoutManager`. This make it easier to manage all timeout values used by the different components of the gbn package.
1 parent dd6046e commit c21b195

File tree

7 files changed

+207
-82
lines changed

7 files changed

+207
-82
lines changed

gbn/gbn_client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,11 @@ handshake:
122122
default:
123123
}
124124

125+
timeout := g.timeoutManager.GetHandshakeTimeout()
126+
125127
var b []byte
126128
select {
127-
case <-time.After(g.handshakeTimeout):
129+
case <-time.After(timeout):
128130
log.Debugf("SYN resendTimeout. Resending SYN.")
129131
continue handshake
130132
case <-g.quit:

gbn/gbn_conn.go

Lines changed: 33 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,7 @@ var (
1818
)
1919

2020
const (
21-
DefaultN = 20
22-
defaultHandshakeTimeout = 100 * time.Millisecond
23-
defaultResendTimeout = 100 * time.Millisecond
24-
finSendTimeout = 1000 * time.Millisecond
25-
DefaultSendTimeout = math.MaxInt64
26-
DefaultRecvTimeout = math.MaxInt64
21+
DefaultN = 20
2722
)
2823

2924
type sendBytesFunc func(ctx context.Context, b []byte) error
@@ -58,30 +53,16 @@ type GoBackNConn struct {
5853
// sequence that we have received.
5954
recvSeq uint8
6055

61-
// resendTimeout is the duration that will be waited before resending
62-
// the packets in the current queue.
63-
resendTimeout time.Duration
64-
resendTicker *time.Ticker
56+
resendTicker *time.Ticker
6557

6658
recvFromStream recvBytesFunc
6759
sendToStream sendBytesFunc
6860

6961
recvDataChan chan *PacketData
7062
sendDataChan chan *PacketData
7163

72-
sendTimeout time.Duration
73-
sendTimeoutMu sync.RWMutex
74-
75-
recvTimeout time.Duration
76-
recvTimeoutMu sync.RWMutex
77-
7864
isServer bool
7965

80-
// handshakeTimeout is the time after which the server or client
81-
// will abort and restart the handshake if the expected response is
82-
// not received from the peer.
83-
handshakeTimeout time.Duration
84-
8566
// receivedACKSignal channel is used to signal that the queue size has
8667
// been decreased.
8768
receivedACKSignal chan struct{}
@@ -109,6 +90,10 @@ type GoBackNConn struct {
10990
quit chan struct{}
11091
closeOnce sync.Once
11192
wg sync.WaitGroup
93+
94+
// timeoutManager is used to manage all the timeouts used by the
95+
// GoBackNConn.
96+
timeoutManager *TimeoutManager
11297
}
11398

11499
// newGoBackNConn creates a GoBackNConn instance with all the members which
@@ -118,51 +103,46 @@ func newGoBackNConn(ctx context.Context, sendFunc sendBytesFunc,
118103

119104
ctxc, cancel := context.WithCancel(ctx)
120105

106+
timeoutManager := NewTimeOutManager(isServer)
107+
108+
queue := newQueue(n+1, timeoutManager)
109+
121110
return &GoBackNConn{
122111
n: n,
123112
s: n + 1,
124-
resendTimeout: defaultResendTimeout,
125113
recvFromStream: recvFunc,
126114
sendToStream: sendFunc,
127115
recvDataChan: make(chan *PacketData, n),
128116
sendDataChan: make(chan *PacketData),
129117
isServer: isServer,
130-
sendQueue: newQueue(n+1, defaultHandshakeTimeout),
131-
handshakeTimeout: defaultHandshakeTimeout,
132-
recvTimeout: DefaultRecvTimeout,
133-
sendTimeout: DefaultSendTimeout,
118+
sendQueue: queue,
134119
receivedACKSignal: make(chan struct{}),
135120
resendSignal: make(chan struct{}, 1),
136121
remoteClosed: make(chan struct{}),
137122
ctx: ctxc,
138123
cancel: cancel,
139124
quit: make(chan struct{}),
125+
timeoutManager: timeoutManager,
140126
}
141127
}
142128

143-
// setN sets the current N to use. This _must_ be set before the handshake is
144-
// completed.
145-
func (g *GoBackNConn) setN(n uint8) {
146-
g.n = n
147-
g.s = n + 1
148-
g.recvDataChan = make(chan *PacketData, n)
149-
g.sendQueue = newQueue(n+1, defaultHandshakeTimeout)
150-
}
151-
152129
// SetSendTimeout sets the timeout used in the Send function.
153130
func (g *GoBackNConn) SetSendTimeout(timeout time.Duration) {
154-
g.sendTimeoutMu.Lock()
155-
defer g.sendTimeoutMu.Unlock()
156-
157-
g.sendTimeout = timeout
131+
g.timeoutManager.SetSendTimeout(timeout)
158132
}
159133

160134
// SetRecvTimeout sets the timeout used in the Recv function.
161135
func (g *GoBackNConn) SetRecvTimeout(timeout time.Duration) {
162-
g.recvTimeoutMu.Lock()
163-
defer g.recvTimeoutMu.Unlock()
136+
g.timeoutManager.SetRecvTimeout(timeout)
137+
}
164138

165-
g.recvTimeout = timeout
139+
// setN sets the current N to use. This _must_ be set before the handshake is
140+
// completed.
141+
func (g *GoBackNConn) setN(n uint8) {
142+
g.n = n
143+
g.s = n + 1
144+
g.recvDataChan = make(chan *PacketData, n)
145+
g.sendQueue = newQueue(n+1, g.timeoutManager)
166146
}
167147

168148
// Send blocks until an ack is received for the packet sent N packets before.
@@ -174,9 +154,7 @@ func (g *GoBackNConn) Send(data []byte) error {
174154
default:
175155
}
176156

177-
g.sendTimeoutMu.RLock()
178-
ticker := time.NewTimer(g.sendTimeout)
179-
g.sendTimeoutMu.RUnlock()
157+
ticker := time.NewTimer(g.timeoutManager.GetSendTimeout())
180158
defer ticker.Stop()
181159

182160
sendPacket := func(packet *PacketData) error {
@@ -235,9 +213,7 @@ func (g *GoBackNConn) Recv() ([]byte, error) {
235213
msg *PacketData
236214
)
237215

238-
g.recvTimeoutMu.RLock()
239-
ticker := time.NewTimer(g.recvTimeout)
240-
g.recvTimeoutMu.RUnlock()
216+
ticker := time.NewTimer(g.timeoutManager.GetRecvTimeout())
241217
defer ticker.Stop()
242218

243219
for {
@@ -279,7 +255,7 @@ func (g *GoBackNConn) start() {
279255

280256
g.pongTicker = NewIntervalAwareForceTicker(pongTime)
281257

282-
g.resendTicker = time.NewTicker(g.resendTimeout)
258+
g.resendTicker = time.NewTicker(g.timeoutManager.GetResendTimeout())
283259

284260
g.wg.Add(1)
285261
go func() {
@@ -335,7 +311,7 @@ func (g *GoBackNConn) Close() error {
335311
default:
336312
log.Tracef("Try sending FIN, isServer=%v", g.isServer)
337313
ctxc, cancel := context.WithTimeout(
338-
g.ctx, finSendTimeout,
314+
g.ctx, g.timeoutManager.GetFinSendTimeout(),
339315
)
340316
defer cancel()
341317
if err := g.sendPacket(ctxc, &PacketFIN{}); err != nil {
@@ -388,7 +364,7 @@ func (g *GoBackNConn) sendPacketsForever() error {
388364
// resendQueue re-sends the current contents of the queue.
389365
resendQueue := func() error {
390366
err := g.sendQueue.resend(
391-
g.resendTimeout, g.quit,
367+
g.quit,
392368
func(packet *PacketData) error {
393369
return g.sendPacket(g.ctx, packet)
394370
},
@@ -400,7 +376,7 @@ func (g *GoBackNConn) sendPacketsForever() error {
400376
// execute. That can happen if the function was awaiting the
401377
// expected ACK for a long time, or times out while awaiting the
402378
// catch up.
403-
g.resendTicker.Reset(g.resendTimeout)
379+
g.resendTicker.Reset(g.timeoutManager.GetResendTimeout())
404380

405381
return err
406382
}
@@ -520,7 +496,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
520496
g.pongTicker.Pause()
521497
}
522498

523-
g.resendTicker.Reset(g.resendTimeout)
499+
g.resendTicker.Reset(g.timeoutManager.GetResendTimeout())
524500

525501
switch m := msg.(type) {
526502
case *PacketData:
@@ -578,7 +554,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
578554
// the resend, and therefore won't react to the
579555
// NACK we send here in time.
580556
sinceSent := time.Since(lastNackTime)
581-
recentlySent := sinceSent < g.resendTimeout*2 //nolint:gomnd
557+
558+
timeout := g.timeoutManager.GetResendTimeout()
559+
recentlySent := sinceSent < timeout*2 //nolint:gomnd
582560

583561
if lastNackSeq == g.recvSeq && recentlySent {
584562
log.Tracef("Recently sent NACK")
@@ -603,9 +581,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
603581
}
604582

605583
case *PacketACK:
606-
gotValidACK := g.sendQueue.processACK(
607-
m.Seq, g.resendTimeout,
608-
)
584+
gotValidACK := g.sendQueue.processACK(m.Seq)
609585

610586
if gotValidACK {
611587
// Send a signal to indicate that new

gbn/gbn_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo
140140
}
141141

142142
select {
143-
case <-time.After(g.handshakeTimeout):
143+
case <-time.After(g.timeoutManager.GetHandshakeTimeout()):
144144
log.Debugf("SYNCACK resendTimeout. Abort and wait " +
145145
"for client to re-initiate")
146146
continue

gbn/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func WithMaxSendSize(size int) Option {
1818
// for ACKs before resending the queue.
1919
func WithTimeout(timeout time.Duration) Option {
2020
return func(conn *GoBackNConn) {
21-
conn.resendTimeout = timeout
21+
conn.timeoutManager.SetResendTimeout(timeout)
2222
}
2323
}
2424

@@ -27,7 +27,7 @@ func WithTimeout(timeout time.Duration) Option {
2727
// will be aborted and restarted.
2828
func WithHandshakeTimeout(timeout time.Duration) Option {
2929
return func(conn *GoBackNConn) {
30-
conn.handshakeTimeout = timeout
30+
conn.timeoutManager.SetHandshakeTimeout(timeout)
3131
}
3232
}
3333

gbn/queue.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,19 @@ type queue struct {
102102
// topMtx is used to guard sequenceTop.
103103
topMtx sync.RWMutex
104104

105-
lastResend time.Time
106-
handshakeTimeout time.Duration
105+
lastResend time.Time
106+
107+
timeoutManager *TimeoutManager
107108
}
108109

109110
// newQueue creates a new queue.
110-
func newQueue(s uint8, handshakeTimeout time.Duration) *queue {
111+
func newQueue(s uint8, timeoutManager *TimeoutManager) *queue {
111112
return &queue{
112113
content: make([]*PacketData, s),
113114
s: s,
114-
handshakeTimeout: handshakeTimeout,
115115
awaitedACKSignal: make(chan struct{}, 1),
116116
awaitedNACKSignal: make(chan struct{}, 1),
117+
timeoutManager: timeoutManager,
117118
}
118119
}
119120

@@ -199,10 +200,10 @@ func (q *queue) addPacket(packet *PacketData) {
199200
//
200201
// When either of the 2 conditions above are met, we will consider both parties
201202
// to be in sync, and we can proceed to send new packets.
202-
func (q *queue) resend(resendTimeout time.Duration, quit chan struct{},
203+
func (q *queue) resend(quit chan struct{},
203204
sendFunc func(packet *PacketData) error,
204205
) error {
205-
if time.Since(q.lastResend) < q.handshakeTimeout {
206+
if time.Since(q.lastResend) < q.timeoutManager.GetHandshakeTimeout() {
206207
log.Tracef("Resent the queue recently.")
207208

208209
return nil
@@ -280,7 +281,7 @@ func (q *queue) resend(resendTimeout time.Duration, quit chan struct{},
280281
q.awaitingCatchUpMu.Unlock()
281282

282283
// Then await until we know that both parties are in sync.
283-
q.awaitCatchUp(resendTimeout, quit)
284+
q.awaitCatchUp(quit)
284285

285286
return nil
286287
}
@@ -290,8 +291,10 @@ func (q *queue) resend(resendTimeout time.Duration, quit chan struct{},
290291
// 3X the resend timeout, the function will also return.
291292
// See the docs for the resend function for more details on why we need to await
292293
// the awaited ACK or NACK signal.
293-
func (q *queue) awaitCatchUp(resendTimeout time.Duration, quit chan struct{}) {
294-
ticker := time.NewTimer(resendTimeout * awaitingTimeoutMultiplier)
294+
func (q *queue) awaitCatchUp(quit chan struct{}) {
295+
ticker := time.NewTimer(
296+
q.timeoutManager.GetResendTimeout() * awaitingTimeoutMultiplier,
297+
)
295298
defer ticker.Stop()
296299

297300
log.Tracef("Awaiting catchup after resending the queue")
@@ -351,7 +354,7 @@ func (q *queue) noPingPackets(base, top uint8) bool {
351354
}
352355

353356
// processACK processes an incoming ACK of a given sequence number.
354-
func (q *queue) processACK(seq uint8, resendTimeout time.Duration) bool {
357+
func (q *queue) processACK(seq uint8) bool {
355358
// If our queue is empty, an ACK should not have any effect.
356359
if q.size() == 0 {
357360
log.Tracef("Received ack %d, but queue is empty. Ignoring.", seq)
@@ -366,7 +369,7 @@ func (q *queue) processACK(seq uint8, resendTimeout time.Duration) bool {
366369
if seq == q.awaitedACK && q.awaitingCatchUp {
367370
log.Tracef("Got awaited ACK")
368371

369-
q.proceedAfterTime(q.catchUpID, resendTimeout)
372+
q.proceedAfterTime(q.catchUpID)
370373
}
371374
q.awaitingCatchUpMu.RUnlock()
372375

@@ -474,7 +477,7 @@ func (q *queue) processNACK(seq uint8) (bool, bool) {
474477

475478
// proceedAfterTime will wait for the resendTimeout and then send an
476479
// awaitedACKSignal, if we're still awaiting the resend catch up.
477-
func (q *queue) proceedAfterTime(catchUpID int64, resendTimeout time.Duration) {
480+
func (q *queue) proceedAfterTime(catchUpID int64) {
478481
processAwaitedACK := func() {
479482
log.Tracef("Executing proceedAfterTime")
480483
q.awaitingCatchUpMu.Lock()
@@ -508,7 +511,7 @@ func (q *queue) proceedAfterTime(catchUpID int64, resendTimeout time.Duration) {
508511
// proceedAfterTime callback, as that's the time we'd expect it to take
509512
// for the other party to respond with a NACK, if the resent last packet
510513
// in the queue would lead to a NACK.
511-
time.AfterFunc(resendTimeout, processAwaitedACK)
514+
time.AfterFunc(q.timeoutManager.GetResendTimeout(), processAwaitedACK)
512515
}
513516

514517
// containsSequence is used to determine if a number, seq, is between two other

gbn/queue_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ import (
77
)
88

99
func TestQueueSize(t *testing.T) {
10-
q := newQueue(4, 0)
10+
timeoutManager := NewTimeOutManager(false)
1111

12-
require.Equal(t, uint8(0), q.size())
12+
queue := newQueue(4, timeoutManager)
1313

14-
q.sequenceBase = 2
15-
q.sequenceTop = 3
16-
require.Equal(t, uint8(1), q.size())
14+
require.Equal(t, uint8(0), queue.size())
1715

18-
q.sequenceBase = 3
19-
q.sequenceTop = 2
20-
require.Equal(t, uint8(3), q.size())
16+
queue.sequenceBase = 2
17+
queue.sequenceTop = 3
18+
require.Equal(t, uint8(1), queue.size())
19+
20+
queue.sequenceBase = 3
21+
queue.sequenceTop = 2
22+
require.Equal(t, uint8(3), queue.size())
2123
}

0 commit comments

Comments
 (0)