55 "errors"
66 "fmt"
77 "io"
8- "math"
98 "sync"
109 "time"
1110
@@ -41,10 +40,6 @@ type GoBackNConn struct {
4140 recvDataChan chan * PacketData
4241 sendDataChan chan * PacketData
4342
44- sendTimeout time.Duration
45- recvTimeout time.Duration
46- timeoutsMu sync.RWMutex
47-
4843 log btclog.Logger
4944
5045 // receivedACKSignal channel is used to signal that the queue size has
@@ -65,6 +60,10 @@ type GoBackNConn struct {
6560 // remoteClosed is closed if the remote party initiated the FIN sequence.
6661 remoteClosed chan struct {}
6762
63+ // timeoutManager is used to manage all the timeouts used by the
64+ // GoBackNConn.
65+ timeoutManager * TimeoutManager
66+
6867 // quit is used to stop the normal operations of the connection.
6968 // Once closed, the send and receive streams will still be available
7069 // for the FIN sequence.
@@ -84,63 +83,62 @@ func newGoBackNConn(ctx context.Context, cfg *config,
8483 prefix := fmt .Sprintf ("(%s)" , loggerPrefix )
8584 plog := build .NewPrefixLog (prefix , log )
8685
86+ timeoutManager := NewTimeOutManager (plog )
87+
8788 g := & GoBackNConn {
8889 cfg : cfg ,
8990 recvDataChan : make (chan * PacketData , cfg .n ),
9091 sendDataChan : make (chan * PacketData ),
91- recvTimeout : DefaultRecvTimeout ,
92- sendTimeout : DefaultSendTimeout ,
9392 receivedACKSignal : make (chan struct {}),
9493 resendSignal : make (chan struct {}, 1 ),
9594 remoteClosed : make (chan struct {}),
9695 ctx : ctxc ,
9796 cancel : cancel ,
9897 log : plog ,
9998 quit : make (chan struct {}),
99+ timeoutManager : timeoutManager ,
100100 }
101101
102- g .sendQueue = newQueue (& queueCfg {
103- s : cfg .n + 1 ,
104- timeout : cfg .resendTimeout ,
105- log : plog ,
106- sendPkt : func (packet * PacketData ) error {
107- return g .sendPacket (g .ctx , packet )
102+ g .sendQueue = newQueue (
103+ & queueCfg {
104+ s : cfg .n + 1 ,
105+ log : plog ,
106+ sendPkt : func (packet * PacketData ) error {
107+ return g .sendPacket (g .ctx , packet )
108+ },
108109 },
109- })
110+ timeoutManager ,
111+ )
110112
111113 return g
112114}
113115
114- // setN sets the current N to use. This _must_ be set before the handshake is
115- // completed.
116- func (g * GoBackNConn ) setN (n uint8 ) {
117- g .cfg .n = n
118- g .cfg .s = n + 1
119- g .recvDataChan = make (chan * PacketData , n )
120- g .sendQueue = newQueue (& queueCfg {
121- s : n + 1 ,
122- timeout : g .cfg .resendTimeout ,
123- log : g .log ,
124- sendPkt : func (packet * PacketData ) error {
125- return g .sendPacket (g .ctx , packet )
126- },
127- })
128- }
129-
130116// SetSendTimeout sets the timeout used in the Send function.
131117func (g * GoBackNConn ) SetSendTimeout (timeout time.Duration ) {
132- g .timeoutsMu .Lock ()
133- defer g .timeoutsMu .Unlock ()
134-
135- g .sendTimeout = timeout
118+ g .timeoutManager .SetSendTimeout (timeout )
136119}
137120
138121// SetRecvTimeout sets the timeout used in the Recv function.
139122func (g * GoBackNConn ) SetRecvTimeout (timeout time.Duration ) {
140- g .timeoutsMu . Lock ( )
141- defer g . timeoutsMu . Unlock ()
123+ g .timeoutManager . SetRecvTimeout ( timeout )
124+ }
142125
143- g .recvTimeout = timeout
126+ // setN sets the current N to use. This _must_ be set before the handshake is
127+ // completed.
128+ func (g * GoBackNConn ) setN (n uint8 ) {
129+ g .cfg .n = n
130+ g .cfg .s = n + 1
131+ g .recvDataChan = make (chan * PacketData , n )
132+ g .sendQueue = newQueue (
133+ & queueCfg {
134+ s : n + 1 ,
135+ log : g .log ,
136+ sendPkt : func (packet * PacketData ) error {
137+ return g .sendPacket (g .ctx , packet )
138+ },
139+ },
140+ g .timeoutManager ,
141+ )
144142}
145143
146144// Send blocks until an ack is received for the packet sent N packets before.
@@ -152,9 +150,7 @@ func (g *GoBackNConn) Send(data []byte) error {
152150 default :
153151 }
154152
155- g .timeoutsMu .RLock ()
156- ticker := time .NewTimer (g .sendTimeout )
157- g .timeoutsMu .RUnlock ()
153+ ticker := time .NewTimer (g .timeoutManager .GetSendTimeout ())
158154 defer ticker .Stop ()
159155
160156 sendPacket := func (packet * PacketData ) error {
@@ -216,9 +212,7 @@ func (g *GoBackNConn) Recv() ([]byte, error) {
216212 msg * PacketData
217213 )
218214
219- g .timeoutsMu .RLock ()
220- ticker := time .NewTimer (g .recvTimeout )
221- g .timeoutsMu .RUnlock ()
215+ ticker := time .NewTimer (g .timeoutManager .GetRecvTimeout ())
222216 defer ticker .Stop ()
223217
224218 for {
@@ -245,22 +239,16 @@ func (g *GoBackNConn) Recv() ([]byte, error) {
245239func (g * GoBackNConn ) start () {
246240 g .log .Debugf ("Starting" )
247241
248- pingTime := time .Duration (math .MaxInt64 )
249- if g .cfg .pingTime != 0 {
250- pingTime = g .cfg .pingTime
251- }
252-
253- g .pingTicker = NewIntervalAwareForceTicker (pingTime )
242+ g .pingTicker = NewIntervalAwareForceTicker (
243+ g .timeoutManager .GetPingTime (),
244+ )
254245 g .pingTicker .Resume ()
255246
256- pongTime := time .Duration (math .MaxInt64 )
257- if g .cfg .pongTime != 0 {
258- pongTime = g .cfg .pongTime
259- }
260-
261- g .pongTicker = NewIntervalAwareForceTicker (pongTime )
247+ g .pongTicker = NewIntervalAwareForceTicker (
248+ g .timeoutManager .GetPongTime (),
249+ )
262250
263- g .resendTicker = time .NewTicker (g .cfg . resendTimeout )
251+ g .resendTicker = time .NewTicker (g .timeoutManager . GetResendTimeout () )
264252
265253 g .wg .Add (1 )
266254 go func () {
@@ -317,7 +305,7 @@ func (g *GoBackNConn) Close() error {
317305 g .log .Tracef ("Try sending FIN" )
318306
319307 ctxc , cancel := context .WithTimeout (
320- g .ctx , defaultFinSendTimeout ,
308+ g .ctx , g . timeoutManager . GetFinSendTimeout () ,
321309 )
322310 defer cancel ()
323311 if err := g .sendPacket (ctxc , & PacketFIN {}); err != nil {
@@ -382,7 +370,7 @@ func (g *GoBackNConn) sendPacketsForever() error {
382370 // execute. That can happen if the function was awaiting the
383371 // expected ACK for a long time, or times out while awaiting the
384372 // catch up.
385- g .resendTicker .Reset (g .cfg . resendTimeout )
373+ g .resendTicker .Reset (g .timeoutManager . GetResendTimeout () )
386374
387375 // Also drain the resend signal channel, as resendTicker.Reset
388376 // doesn't drain the channel if the ticker ticked during the
@@ -509,7 +497,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
509497 g .pongTicker .Pause ()
510498 }
511499
512- g .resendTicker .Reset (g .cfg . resendTimeout )
500+ g .resendTicker .Reset (g .timeoutManager . GetResendTimeout () )
513501
514502 switch m := msg .(type ) {
515503 case * PacketData :
@@ -567,8 +555,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
567555 // the resend, and therefore won't react to the
568556 // NACK we send here in time.
569557 sinceSent := time .Since (lastNackTime )
570- recentlySent := sinceSent <
571- g .cfg .resendTimeout * 2
558+
559+ timeout := g .timeoutManager .GetResendTimeout ()
560+ recentlySent := sinceSent < timeout * 2
572561
573562 if lastNackSeq == g .recvSeq && recentlySent {
574563 g .log .Tracef ("Recently sent NACK" )
0 commit comments