Skip to content

Commit 09c03b8

Browse files
committed
gbn: use timeout manager to handle timeouts
This commit moves the responsibility of managing the timeout values throughout the gbn package to the `TimeoutManager` struct.
1 parent eb69491 commit 09c03b8

File tree

7 files changed

+72
-85
lines changed

7 files changed

+72
-85
lines changed

gbn/gbn_client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,11 @@ handshake:
120120
default:
121121
}
122122

123+
timeout := g.timeoutManager.GetHandshakeTimeout()
124+
123125
var b []byte
124126
select {
125-
case <-time.After(g.handshakeTimeout):
127+
case <-time.After(timeout):
126128
log.Debugf("SYN resendTimeout. Resending SYN.")
127129
continue handshake
128130
case <-g.quit:

gbn/gbn_conn.go

Lines changed: 34 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,7 @@ var (
1818
)
1919

2020
const (
21-
DefaultN = 20
22-
defaultHandshakeTimeout = 100 * time.Millisecond
23-
defaultResendTimeout = 100 * time.Millisecond
24-
finSendTimeout = 1000 * time.Millisecond
25-
DefaultSendTimeout = math.MaxInt64
26-
DefaultRecvTimeout = math.MaxInt64
21+
DefaultN = 20
2722
)
2823

2924
type sendBytesFunc func(ctx context.Context, b []byte) error
@@ -58,30 +53,16 @@ type GoBackNConn struct {
5853
// sequence that we have received.
5954
recvSeq uint8
6055

61-
// resendTimeout is the duration that will be waited before resending
62-
// the packets in the current queue.
63-
resendTimeout time.Duration
64-
resendTicker *time.Ticker
56+
resendTicker *time.Ticker
6557

6658
recvFromStream recvBytesFunc
6759
sendToStream sendBytesFunc
6860

6961
recvDataChan chan *PacketData
7062
sendDataChan chan *PacketData
7163

72-
sendTimeout time.Duration
73-
sendTimeoutMu sync.RWMutex
74-
75-
recvTimeout time.Duration
76-
recvTimeoutMu sync.RWMutex
77-
7864
isServer bool
7965

80-
// handshakeTimeout is the time after which the server or client
81-
// will abort and restart the handshake if the expected response is
82-
// not received from the peer.
83-
handshakeTimeout time.Duration
84-
8566
// receivedACKSignal channel is used to signal that the queue size has
8667
// been decreased.
8768
receivedACKSignal chan struct{}
@@ -103,6 +84,10 @@ type GoBackNConn struct {
10384
// remoteClosed is closed if the remote party initiated the FIN sequence.
10485
remoteClosed chan struct{}
10586

87+
// timeoutManager is used to manage all the timeouts used by the
88+
// GoBackNConn.
89+
timeoutManager *TimeoutManager
90+
10691
// quit is used to stop the normal operations of the connection.
10792
// Once closed, the send and receive streams will still be available
10893
// for the FIN sequence.
@@ -121,21 +106,20 @@ func newGoBackNConn(ctx context.Context, sendFunc sendBytesFunc,
121106

122107
ctxc, cancel := context.WithCancel(ctx)
123108

109+
timeoutManager := NewTimeOutManager(isServer)
110+
124111
gbn := &GoBackNConn{
125-
resendTimeout: defaultResendTimeout,
126112
recvFromStream: recvFunc,
127113
sendToStream: sendFunc,
128114
sendDataChan: make(chan *PacketData),
129115
isServer: isServer,
130-
handshakeTimeout: defaultHandshakeTimeout,
131-
recvTimeout: DefaultRecvTimeout,
132-
sendTimeout: DefaultSendTimeout,
133116
receivedACKSignal: make(chan struct{}),
134117
resendSignal: make(chan struct{}, 1),
135118
remoteClosed: make(chan struct{}),
136119
ctx: ctxc,
137120
cancel: cancel,
138121
quit: make(chan struct{}),
122+
timeoutManager: timeoutManager,
139123
}
140124

141125
for _, o := range opts {
@@ -147,34 +131,31 @@ func newGoBackNConn(ctx context.Context, sendFunc sendBytesFunc,
147131
return gbn
148132
}
149133

134+
// SetSendTimeout sets the timeout used in the Send function.
135+
func (g *GoBackNConn) SetSendTimeout(timeout time.Duration) {
136+
g.timeoutManager.SetSendTimeout(timeout)
137+
}
138+
139+
// SetRecvTimeout sets the timeout used in the Recv function.
140+
func (g *GoBackNConn) SetRecvTimeout(timeout time.Duration) {
141+
g.timeoutManager.SetRecvTimeout(timeout)
142+
}
143+
150144
// setN sets the current N to use. This _must_ be set before the handshake is
151145
// completed.
152146
func (g *GoBackNConn) setN(n uint8) {
153147
g.n = n
154148
g.s = n + 1
155149
g.recvDataChan = make(chan *PacketData, n)
156-
g.sendQueue = newQueue(&queueConfig{
150+
151+
cfg := &queueConfig{
157152
s: g.s,
158153
sendPkt: func(packet *PacketData) error {
159154
return g.sendPacket(g.ctx, packet)
160155
},
161-
})
162-
}
163-
164-
// SetSendTimeout sets the timeout used in the Send function.
165-
func (g *GoBackNConn) SetSendTimeout(timeout time.Duration) {
166-
g.sendTimeoutMu.Lock()
167-
defer g.sendTimeoutMu.Unlock()
168-
169-
g.sendTimeout = timeout
170-
}
171-
172-
// SetRecvTimeout sets the timeout used in the Recv function.
173-
func (g *GoBackNConn) SetRecvTimeout(timeout time.Duration) {
174-
g.recvTimeoutMu.Lock()
175-
defer g.recvTimeoutMu.Unlock()
156+
}
176157

177-
g.recvTimeout = timeout
158+
g.sendQueue = newQueue(cfg, g.timeoutManager)
178159
}
179160

180161
// Send blocks until an ack is received for the packet sent N packets before.
@@ -186,9 +167,7 @@ func (g *GoBackNConn) Send(data []byte) error {
186167
default:
187168
}
188169

189-
g.sendTimeoutMu.RLock()
190-
ticker := time.NewTimer(g.sendTimeout)
191-
g.sendTimeoutMu.RUnlock()
170+
ticker := time.NewTimer(g.timeoutManager.GetSendTimeout())
192171
defer ticker.Stop()
193172

194173
sendPacket := func(packet *PacketData) error {
@@ -247,9 +226,7 @@ func (g *GoBackNConn) Recv() ([]byte, error) {
247226
msg *PacketData
248227
)
249228

250-
g.recvTimeoutMu.RLock()
251-
ticker := time.NewTimer(g.recvTimeout)
252-
g.recvTimeoutMu.RUnlock()
229+
ticker := time.NewTimer(g.timeoutManager.GetRecvTimeout())
253230
defer ticker.Stop()
254231

255232
for {
@@ -291,7 +268,7 @@ func (g *GoBackNConn) start() {
291268

292269
g.pongTicker = NewIntervalAwareForceTicker(pongTime)
293270

294-
g.resendTicker = time.NewTicker(g.resendTimeout)
271+
g.resendTicker = time.NewTicker(g.timeoutManager.GetResendTimeout())
295272

296273
g.wg.Add(1)
297274
go func() {
@@ -347,7 +324,7 @@ func (g *GoBackNConn) Close() error {
347324
default:
348325
log.Tracef("Try sending FIN, isServer=%v", g.isServer)
349326
ctxc, cancel := context.WithTimeout(
350-
g.ctx, finSendTimeout,
327+
g.ctx, g.timeoutManager.GetFinSendTimeout(),
351328
)
352329
defer cancel()
353330
if err := g.sendPacket(ctxc, &PacketFIN{}); err != nil {
@@ -401,15 +378,15 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
401378
func (g *GoBackNConn) sendPacketsForever() error {
402379
// resendQueue re-sends the current contents of the queue.
403380
resendQueue := func() error {
404-
err := g.sendQueue.resend(g.resendTimeout)
381+
err := g.sendQueue.resend()
405382

406383
// After resending the queue, we reset the resend ticker.
407384
// This is so that we don't immediately resend the queue again,
408385
// if the sendQueue.resend call above took a long time to
409386
// execute. That can happen if the function was awaiting the
410387
// expected ACK for a long time, or times out while awaiting the
411388
// catch up.
412-
g.resendTicker.Reset(g.resendTimeout)
389+
g.resendTicker.Reset(g.timeoutManager.GetResendTimeout())
413390

414391
return err
415392
}
@@ -529,7 +506,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
529506
g.pongTicker.Pause()
530507
}
531508

532-
g.resendTicker.Reset(g.resendTimeout)
509+
g.resendTicker.Reset(g.timeoutManager.GetResendTimeout())
533510

534511
switch m := msg.(type) {
535512
case *PacketData:
@@ -587,7 +564,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
587564
// the resend, and therefore won't react to the
588565
// NACK we send here in time.
589566
sinceSent := time.Since(lastNackTime)
590-
recentlySent := sinceSent < g.resendTimeout*2 //nolint:gomnd
567+
568+
timeout := g.timeoutManager.GetResendTimeout()
569+
recentlySent := sinceSent < timeout*2 //nolint:gomnd
591570

592571
if lastNackSeq == g.recvSeq && recentlySent {
593572
log.Tracef("Recently sent NACK")
@@ -612,9 +591,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
612591
}
613592

614593
case *PacketACK:
615-
gotValidACK := g.sendQueue.processACK(
616-
m.Seq, g.resendTimeout,
617-
)
594+
gotValidACK := g.sendQueue.processACK(m.Seq)
618595

619596
if gotValidACK {
620597
// Send a signal to indicate that new

gbn/gbn_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo
135135
}
136136

137137
select {
138-
case <-time.After(g.handshakeTimeout):
138+
case <-time.After(g.timeoutManager.GetHandshakeTimeout()):
139139
log.Debugf("SYNCACK resendTimeout. Abort and wait " +
140140
"for client to re-initiate")
141141
continue

gbn/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func WithMaxSendSize(size int) Option {
1818
// for ACKs before resending the queue.
1919
func WithTimeout(timeout time.Duration) Option {
2020
return func(conn *GoBackNConn) {
21-
conn.resendTimeout = timeout
21+
conn.timeoutManager.SetStaticResendTimeout(timeout)
2222
}
2323
}
2424

@@ -27,7 +27,7 @@ func WithTimeout(timeout time.Duration) Option {
2727
// will be aborted and restarted.
2828
func WithHandshakeTimeout(timeout time.Duration) Option {
2929
return func(conn *GoBackNConn) {
30-
conn.handshakeTimeout = timeout
30+
conn.timeoutManager.SetHandshakeTimeout(timeout)
3131
}
3232
}
3333

gbn/queue.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,19 @@ type queue struct {
106106
lastResend time.Time
107107

108108
quit chan struct{}
109+
110+
timeoutManager *TimeoutManager
109111
}
110112

111113
// newQueue creates a new queue.
112-
func newQueue(cfg *queueConfig) *queue {
114+
func newQueue(cfg *queueConfig, timeoutManager *TimeoutManager) *queue {
113115
return &queue{
114116
cfg: cfg,
115117
content: make([]*PacketData, cfg.s),
116118
awaitedACKSignal: make(chan struct{}, 1),
117119
awaitedNACKSignal: make(chan struct{}, 1),
118120
quit: make(chan struct{}),
121+
timeoutManager: timeoutManager,
119122
}
120123
}
121124

@@ -205,8 +208,8 @@ func (q *queue) addPacket(packet *PacketData) {
205208
//
206209
// When either of the 2 conditions above are met, we will consider both parties
207210
// to be in sync, and we can proceed to send new packets.
208-
func (q *queue) resend(resendTimeout time.Duration) error {
209-
if time.Since(q.lastResend) < resendTimeout {
211+
func (q *queue) resend() error {
212+
if time.Since(q.lastResend) < q.timeoutManager.GetHandshakeTimeout() {
210213
log.Tracef("Resent the queue recently.")
211214

212215
return nil
@@ -284,7 +287,7 @@ func (q *queue) resend(resendTimeout time.Duration) error {
284287
q.awaitingCatchUpMu.Unlock()
285288

286289
// Then await until we know that both parties are in sync.
287-
q.awaitCatchUp(resendTimeout)
290+
q.awaitCatchUp()
288291

289292
return nil
290293
}
@@ -296,8 +299,10 @@ func (q *queue) resend(resendTimeout time.Duration) error {
296299
// the awaited ACK or NACK signal.
297300
//
298301
//nolint:cyclop
299-
func (q *queue) awaitCatchUp(resendTimeout time.Duration) {
300-
ticker := time.NewTimer(resendTimeout * awaitingTimeoutMultiplier)
302+
func (q *queue) awaitCatchUp() {
303+
ticker := time.NewTimer(
304+
q.timeoutManager.GetResendTimeout() * awaitingTimeoutMultiplier,
305+
)
301306
defer ticker.Stop()
302307

303308
log.Tracef("Awaiting catchup after resending the queue")
@@ -362,7 +367,7 @@ func (q *queue) noPingPackets(base, top uint8) bool {
362367
}
363368

364369
// processACK processes an incoming ACK of a given sequence number.
365-
func (q *queue) processACK(seq uint8, resendTimeout time.Duration) bool {
370+
func (q *queue) processACK(seq uint8) bool {
366371
// If our queue is empty, an ACK should not have any effect.
367372
if q.size() == 0 {
368373
log.Tracef("Received ack %d, but queue is empty. Ignoring.", seq)
@@ -377,7 +382,7 @@ func (q *queue) processACK(seq uint8, resendTimeout time.Duration) bool {
377382
if seq == q.awaitedACK && q.awaitingCatchUp {
378383
log.Tracef("Got awaited ACK")
379384

380-
q.proceedAfterTime(resendTimeout)
385+
q.proceedAfterTime()
381386
}
382387
q.awaitingCatchUpMu.RUnlock()
383388

@@ -485,7 +490,7 @@ func (q *queue) processNACK(seq uint8) (bool, bool) {
485490

486491
// proceedAfterTime will wait for the resendTimeout and then send an
487492
// awaitedACKSignal, if we're still awaiting the resend catch up.
488-
func (q *queue) proceedAfterTime(resendTimeout time.Duration) {
493+
func (q *queue) proceedAfterTime() {
489494
caughtUpSignal := q.caughtUpSignal
490495

491496
processAwaitedACK := func() {
@@ -522,7 +527,7 @@ func (q *queue) proceedAfterTime(resendTimeout time.Duration) {
522527
// proceedAfterTime callback, as that's the time we'd expect it to take
523528
// for the other party to respond with a NACK, if the resent last packet
524529
// in the queue would lead to a NACK.
525-
time.AfterFunc(resendTimeout, processAwaitedACK)
530+
time.AfterFunc(q.timeoutManager.GetResendTimeout(), processAwaitedACK)
526531
}
527532

528533
// containsSequence is used to determine if a number, seq, is between two other

gbn/queue_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ import (
77
)
88

99
func TestQueueSize(t *testing.T) {
10-
queue := newQueue(&queueConfig{
10+
timeoutManager := NewTimeOutManager(false)
11+
cfg := &queueConfig{
1112
s: 4,
1213
sendPkt: func(packet *PacketData) error {
1314
return nil
1415
},
15-
})
16+
}
17+
18+
queue := newQueue(cfg, timeoutManager)
1619

1720
require.Equal(t, uint8(0), queue.size())
1821

0 commit comments

Comments
 (0)