Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ linters:
# guidelines. See https://github.com/mvdan/gofumpt/issues/235.
- gofumpt

# Disable gomnd even though we generally don't use magic numbers, but there
# are exceptions where this improves readability.
- gomnd

# Disable whitespace linter as it has conflict rules against our
# contribution guidelines. See https://github.com/bombsimon/wsl/issues/109.
#
Expand Down
4 changes: 4 additions & 0 deletions gbn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type config struct {
// packet.
sendToStream sendBytesFunc

// onFIN is a callback that if set, will be called once a FIN packet has
// been received and processed.
onFIN func()

// handshakeTimeout is the time after which the server or client
// will abort and restart the handshake if the expected response is
// not received from the peer.
Expand Down
95 changes: 71 additions & 24 deletions gbn/gbn_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,10 @@ func newGoBackNConn(ctx context.Context, cfg *config,
prefix := fmt.Sprintf("(%s)", loggerPrefix)
plog := build.NewPrefixLog(prefix, log)

return &GoBackNConn{
cfg: cfg,
recvDataChan: make(chan *PacketData, cfg.n),
sendDataChan: make(chan *PacketData),
sendQueue: newQueue(
cfg.n+1, defaultHandshakeTimeout, plog,
),
g := &GoBackNConn{
cfg: cfg,
recvDataChan: make(chan *PacketData, cfg.n),
sendDataChan: make(chan *PacketData),
recvTimeout: DefaultRecvTimeout,
sendTimeout: DefaultSendTimeout,
receivedACKSignal: make(chan struct{}),
Expand All @@ -106,6 +103,17 @@ func newGoBackNConn(ctx context.Context, cfg *config,
log: plog,
quit: make(chan struct{}),
}

g.sendQueue = newQueue(&queueCfg{
s: cfg.n + 1,
timeout: cfg.resendTimeout,
log: plog,
sendPkt: func(packet *PacketData) error {
return g.sendPacket(g.ctx, packet)
},
})

return g
}

// setN sets the current N to use. This _must_ be set before the handshake is
Expand All @@ -114,7 +122,14 @@ func (g *GoBackNConn) setN(n uint8) {
g.cfg.n = n
g.cfg.s = n + 1
g.recvDataChan = make(chan *PacketData, n)
g.sendQueue = newQueue(n+1, defaultHandshakeTimeout, g.log)
g.sendQueue = newQueue(&queueCfg{
s: n + 1,
timeout: g.cfg.resendTimeout,
log: g.log,
sendPkt: func(packet *PacketData) error {
return g.sendPacket(g.ctx, packet)
},
})
}

// SetSendTimeout sets the timeout used in the Send function.
Expand Down Expand Up @@ -320,6 +335,8 @@ func (g *GoBackNConn) Close() error {
// initialisation.
g.cancel()

g.sendQueue.stop()

g.wg.Wait()

if g.pingTicker != nil {
Expand Down Expand Up @@ -359,9 +376,28 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
func (g *GoBackNConn) sendPacketsForever() error {
// resendQueue re-sends the current contents of the queue.
resendQueue := func() error {
return g.sendQueue.resend(func(packet *PacketData) error {
return g.sendPacket(g.ctx, packet)
})
err := g.sendQueue.resend()
if err != nil {
return err
}

// After resending the queue, we reset the resend ticker.
// This is so that we don't immediately resend the queue again,
// if the sendQueue.resend call above took a long time to
// execute. That can happen if the function was awaiting the
// expected ACK for a long time, or times out while awaiting the
// catch up.
g.resendTicker.Reset(g.cfg.resendTimeout)

// Also drain the resend signal channel, as resendTicker.Reset
// doesn't drain the channel if the ticker ticked during the
// sendQueue.resend() call above.
select {
case <-g.resendTicker.C:
default:
}

return nil
}

for {
Expand Down Expand Up @@ -478,6 +514,8 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
g.pongTicker.Pause()
}

g.resendTicker.Reset(g.cfg.resendTimeout)

switch m := msg.(type) {
case *PacketData:
switch m.Seq == g.recvSeq {
Expand Down Expand Up @@ -526,9 +564,19 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo

// If we recently sent a NACK for the same
// sequence number then back off.
if lastNackSeq == g.recvSeq &&
time.Since(lastNackTime) <
g.cfg.resendTimeout {
// We wait 2 times the resendTimeout before
// sending a new nack, as this case is likely
// hit if the sender is currently resending
// the queue, and therefore the threads that
// are resending the queue is likely busy with
// the resend, and therefore won't react to the
// NACK we send here in time.
sinceSent := time.Since(lastNackTime)
recentlySent := sinceSent <
g.cfg.resendTimeout*2

if lastNackSeq == g.recvSeq && recentlySent {
g.log.Tracef("Recently sent NACK")

continue
}
Expand All @@ -552,8 +600,6 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
case *PacketACK:
gotValidACK := g.sendQueue.processACK(m.Seq)
if gotValidACK {
g.resendTicker.Reset(g.cfg.resendTimeout)

// Send a signal to indicate that new
// ACKs have been received.
select {
Expand All @@ -569,15 +615,12 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
// sent was dropped, or maybe we sent a duplicate
// message. The NACK message contains the sequence
// number that the receiver was expecting.
inQueue, bumped := g.sendQueue.processNACK(m.Seq)

// If the NACK sequence number is not in our queue
// then we ignore it. We must have received the ACK
// for the sequence number in the meantime.
if !inQueue {
g.log.Tracef("NACK seq %d is not in the "+
"queue. Ignoring", m.Seq)
shouldResend, bumped := g.sendQueue.processNACK(m.Seq)

// If we don't need to resend the queue after processing
// the NACK, we can continue without sending the resend
// signal.
if !shouldResend {
continue
}

Expand Down Expand Up @@ -606,6 +649,10 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo

close(g.remoteClosed)

if g.cfg.onFIN != nil {
g.cfg.onFIN()
}

return errTransportClosing

default:
Expand Down
8 changes: 8 additions & 0 deletions gbn/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ func WithKeepalivePing(ping, pong time.Duration) Option {
conn.pongTime = pong
}
}

// WithOnFIN is used to set the onFIN callback that will be called once a FIN
// packet has been received and processed.
func WithOnFIN(fn func()) Option {
return func(conn *config) {
conn.onFIN = fn
}
}
Loading