Skip to content

Commit 6bb393f

Browse files
committed
gbn: add syncer struct
The syncer struct can be used to ensure that both the sender and the receiver are in sync before the waitForSync function is completed. This ensures that we don't end up in a loop where one party keeps resending packets over and over again. This is done by awaiting that we receive either the an expected ACK or an expected NACK after resending the queue. The expected ACK is the ACK for the last packet that has been resent, and the expected NACK is the NACK sequence following the last packet that has been resent. To understand why we need to await the awaited ACK/NACK after resending the queue, consider the following scenario: 1. Alice sends packets 1, 2, 3 & 4 to Bob. 2. Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs. 3. Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for packets 3 & 4 are delayed and aren't received until Alice resend timeout has passed, which leads to Alice resending packets 3 & 4. Alice will after that receive the delayed ACKs for packets 3 & 4, but will consider that as the ACKs for the resent packets, and not the original packets which they were actually sent for. If we didn't wait after resending the queue, Alice would then proceed to send more packets (5 & 6). 4. When Bob receives the resent packets 3 & 4, Bob will respond with NACK 5. Due to latency, the packets 5 & 6 that Alice sent in step (3) above will then be received by Bob, and be processed as the correct response to the NACK 5. Bob will after that await packet 7. 5. Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob is now awaiting packet 7, this send will lead to a NACK 7. But due to latency, if Alice doesn't wait resending the queue, Alice will proceed to send new packet(s) before receiving the NACK 7. 6. This resend loop would continue indefinitely, so we need to ensure that Alice waits after she has resent the queue, to ensure that she doesn't proceed to send new packets before she is sure that both parties are in sync. To ensure that we are in sync, after we have resent the queue, we will await that we either: 1. Receive a NACK for the sequence number succeeding the last packet in the resent queue i.e. in step (3) above, that would be NACK 5. OR 2. Receive an ACK for the last packet in the resent queue i.e. in step (3) above, that would be ACK 4. After we receive the expected ACK, we will then wait for the duration of the resend timeout before continuing. The reason why we wait for the resend timeout before continuing, is that the ACKs we are getting after a resend, could be delayed ACKs for the original packets we sent, and not ACKs for the resent packets. In step (3) above, the ACKs for packets 3 & 4 that Alice received were delayed ACKs for the original packets. If Alice would have immediately continued to send new packets (5 & 6) after receiving the ACK 4, she would have then received the NACK 5 from Bob which was the actual response to the resent queue. But as Alice had already continued to send packets 5 & 6 when receiving the NACK 5, the resend queue response to that NACK would cause the resend loop to continue indefinitely. When either of the 2 conditions above are met, we will consider both parties to be in sync, and we can proceed to send new packets.
1 parent ac06578 commit 6bb393f

File tree

2 files changed

+410
-0
lines changed

2 files changed

+410
-0
lines changed

gbn/syncer.go

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
package gbn
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/btcsuite/btclog"
8+
)
9+
10+
const (
11+
// awaitingTimeoutMultiplier defines the multiplier we use when
12+
// multiplying the resend timeout, resulting in duration we wait to be
13+
// sync to complete before timing out.
14+
// We set this to 3X the resend timeout. The reason we wait exactly 3X
15+
// the resend timeout is that we expect that the max time correct
16+
// behavior would take, would be:
17+
// * 1X the resendTimeout for the time it would take for the party
18+
// respond with an ACK for the last packet in the resend queue, i.e. the
19+
// awaitedACK.
20+
// * 1X the resendTimeout while waiting in proceedAfterTime before
21+
// completing the sync.
22+
// * 1X extra resendTimeout as buffer, to ensure that we have enough
23+
// time to process the ACKS/NACKS by other party + some extra margin.
24+
awaitingTimeoutMultiplier = 3
25+
)
26+
27+
type syncState uint8
28+
29+
const (
30+
// syncStateIdle defines that the syncer is idle and has not yet
31+
// initiated a resend sync.
32+
syncStateIdle syncState = iota
33+
34+
// syncStateResending defines that the syncer has initiated a resend
35+
// sync, and is awaiting that the sync is completed.
36+
syncStateResending
37+
)
38+
39+
// syncer is used to ensure that both the sender and the receiver are in sync
40+
// before the waitForSync function is completed. This is done by awaiting that
41+
// we receive either the expected ACK or NACK after resending the queue.
42+
//
43+
// To understand why we need to await the awaited ACK/NACK after resending the
44+
// queue, it ensures that we don't end up in a situation where we resend the
45+
// queue over and over again due to latency and delayed NACKs by the other
46+
// party.
47+
//
48+
// Consider the following scenario:
49+
// 1.
50+
// Alice sends packets 1, 2, 3 & 4 to Bob.
51+
// 2.
52+
// Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs.
53+
// 3.
54+
// Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for
55+
// packets 3 & 4 are delayed and aren't received until Alice resend timeout
56+
// has passed, which leads to Alice resending packets 3 & 4. Alice will after
57+
// that receive the delayed ACKs for packets 3 & 4, but will consider that as
58+
// the ACKs for the resent packets, and not the original packets which they were
59+
// actually sent for. If we didn't wait after resending the queue, Alice would
60+
// then proceed to send more packets (5 & 6).
61+
// 4.
62+
// When Bob receives the resent packets 3 & 4, Bob will respond with NACK 5. Due
63+
// to latency, the packets 5 & 6 that Alice sent in step (3) above will then be
64+
// received by Bob, and be processed as the correct response to the NACK 5. Bob
65+
// will after that await packet 7.
66+
// 5.
67+
// Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob is
68+
// now awaiting packet 7, this send will lead to a NACK 7. But due to latency,
69+
// if Alice doesn't wait resending the queue, Alice will proceed to send new
70+
// packet(s) before receiving the NACK 7.
71+
// 6.
72+
// This resend loop would continue indefinitely, so we need to ensure that Alice
73+
// waits after she has resent the queue, to ensure that she doesn't proceed to
74+
// send new packets before she is sure that both parties are in sync.
75+
//
76+
// To ensure that we are in sync, after we have resent the queue, we will await
77+
// that we either:
78+
// 1. Receive a NACK for the sequence number succeeding the last packet in the
79+
// resent queue i.e. in step (3) above, that would be NACK 5.
80+
// OR
81+
// 2. Receive an ACK for the last packet in the resent queue i.e. in step (3)
82+
// above, that would be ACK 4. After we receive the expected ACK, we will then
83+
// wait for the duration of the resend timeout before continuing. The reason why
84+
// we wait for the resend timeout before continuing, is that the ACKs we are
85+
// getting after a resend, could be delayed ACKs for the original packets we
86+
// sent, and not ACKs for the resent packets. In step (3) above, the ACKs for
87+
// packets 3 & 4 that Alice received were delayed ACKs for the original packets.
88+
// If Alice would have immediately continued to send new packets (5 & 6) after
89+
// receiving the ACK 4, she would have then received the NACK 5 from Bob which
90+
// was the actual response to the resent queue. But as Alice had already
91+
// continued to send packets 5 & 6 when receiving the NACK 5, the resend queue
92+
// response to that NACK would cause the resend loop to continue indefinitely.
93+
//
94+
// When either of the 2 conditions above are met, we will consider both parties
95+
// to be in sync.
96+
type syncer struct {
97+
s uint8
98+
log btclog.Logger
99+
timeout time.Duration
100+
101+
state syncState
102+
103+
// awaitedACK defines the sequence number for the last packet in the
104+
// resend queue. If we receive an ACK for this sequence number during
105+
// waiting to sync, we wait for the duration of the resend timeout,
106+
// and then proceed to send new packets, unless we receive the
107+
// awaitedNACK during the wait time. If that happens, we will proceed
108+
// send new packets as soon as we have processed the NACK.
109+
awaitedACK uint8
110+
111+
// awaitedNACK defines the sequence number that in case we get a NACK
112+
// with that sequence number when waiting to sync, we'd consider
113+
// the sync to be completed and we can proceed to send new packets.
114+
awaitedNACK uint8
115+
116+
// cancel is used to mark that the sync has been completed.
117+
cancel chan struct{}
118+
119+
quit chan struct{}
120+
mu sync.Mutex
121+
}
122+
123+
// newSyncer creates a new syncer instance.
124+
func newSyncer(s uint8, prefixLogger btclog.Logger, timeout time.Duration,
125+
quit chan struct{}) *syncer {
126+
127+
if prefixLogger == nil {
128+
prefixLogger = log
129+
}
130+
131+
return &syncer{
132+
s: s,
133+
log: prefixLogger,
134+
timeout: timeout,
135+
state: syncStateIdle,
136+
cancel: make(chan struct{}),
137+
quit: quit,
138+
}
139+
}
140+
141+
// reset resets the syncer state to idle and marks the sync as completed.
142+
func (c *syncer) reset() {
143+
c.mu.Lock()
144+
defer c.mu.Unlock()
145+
146+
c.resetUnsafe()
147+
}
148+
149+
// resetUnsafe resets the syncer state to idle and marks the sync as completed.
150+
//
151+
// NOTE: when calling this function, the caller must hold the syncer mutex.
152+
func (c *syncer) resetUnsafe() {
153+
c.state = syncStateIdle
154+
155+
// Cancel any pending sync.
156+
select {
157+
case c.cancel <- struct{}{}:
158+
default:
159+
}
160+
}
161+
162+
// initResendUpTo initializes the syncer to the resending state, and will after
163+
// this call be ready to await the sync to be completed when calling the
164+
// waitForSync function.
165+
// The top argument defines the sequence number of the next packet to be sent
166+
// after resending the queue.
167+
func (c *syncer) initResendUpTo(top uint8) {
168+
c.mu.Lock()
169+
defer c.mu.Unlock()
170+
171+
c.state = syncStateResending
172+
173+
// Drain the cancel channel, to reinitialize it for the new sync.
174+
select {
175+
case <-c.cancel:
176+
default:
177+
}
178+
179+
c.awaitedACK = (c.s + top - 1) % c.s
180+
c.awaitedNACK = top
181+
182+
c.log.Tracef("Set awaitedACK to %d & awaitedNACK to %d",
183+
c.awaitedACK, c.awaitedNACK)
184+
}
185+
186+
// getState returns the current state of the syncer.
187+
func (c *syncer) getState() syncState {
188+
c.mu.Lock()
189+
defer c.mu.Unlock()
190+
191+
return c.state
192+
}
193+
194+
// waitForSync waits for the sync to be completed. The sync is completed when we
195+
// receive either the awaitedNACK, the awaitedACK + resend timeout has passed,
196+
// or when timing out.
197+
func (c *syncer) waitForSync() {
198+
c.log.Tracef("Awaiting sync after resending the queue")
199+
200+
select {
201+
case <-c.quit:
202+
return
203+
204+
case <-c.cancel:
205+
c.log.Tracef("sync canceled or reset")
206+
207+
case <-time.After(c.timeout * awaitingTimeoutMultiplier):
208+
c.log.Tracef("Timed out while waiting for sync")
209+
}
210+
211+
c.reset()
212+
}
213+
214+
// processACK marks the sync as completed if the passed sequence number matches
215+
// the awaitedACK, after the resend timeout has passed.
216+
// If we are not resending or waiting after a resend, this is a no-op.
217+
func (c *syncer) processACK(seq uint8) {
218+
c.mu.Lock()
219+
defer c.mu.Unlock()
220+
221+
// If we are not resending or waiting after a resend, just swallow the
222+
// ACK.
223+
if c.state != syncStateResending {
224+
return
225+
}
226+
227+
// Else, if we are waiting but this is not the ack we are waiting for,
228+
// just swallow it.
229+
if seq != c.awaitedACK {
230+
return
231+
}
232+
233+
c.log.Tracef("Got awaited ACK")
234+
235+
// We start the proceedAfterTime function in a goroutine, as we
236+
// don't want to block the processing of other NACKs/ACKs while
237+
// we're waiting for the resend timeout to expire.
238+
go c.proceedAfterTime()
239+
}
240+
241+
// processNACK marks the sync as completed if the passed sequence number matches
242+
// the awaitedNACK.
243+
// If we are not resending or waiting after a resend, this is a no-op.
244+
func (c *syncer) processNACK(seq uint8) {
245+
c.mu.Lock()
246+
defer c.mu.Unlock()
247+
248+
// If we are not resending or waiting after a resend, just swallow the
249+
// NACK.
250+
if c.state != syncStateResending {
251+
return
252+
}
253+
254+
// Else, if we are waiting but this is not the NACK we are waiting for,
255+
// just swallow it.
256+
if seq != c.awaitedNACK {
257+
return
258+
}
259+
260+
c.log.Tracef("Got awaited NACK")
261+
262+
c.resetUnsafe()
263+
}
264+
265+
// proceedAfterTime will wait for the resendTimeout and then complete the sync,
266+
// if we haven't completed the sync yet by receiving the awaitedNACK.
267+
func (c *syncer) proceedAfterTime() {
268+
// We await for the duration of the resendTimeout before completing the
269+
// sync, as that's the time we'd expect it to take for the other party
270+
// to respond with a NACK, if the resent last packet in the
271+
// queue would lead to a NACK. If we receive the awaitedNACK
272+
// before the timeout, the cancel channel will be sent over, and we can
273+
// stop the execution early.
274+
select {
275+
case <-c.quit:
276+
return
277+
278+
case <-c.cancel:
279+
c.log.Tracef("sync succeeded or was reset")
280+
281+
// As we can't be sure that waitForSync cancel listener was
282+
// triggered before this one, we send over the cancel channel
283+
// again, to make sure that both listeners are triggered.
284+
c.reset()
285+
286+
return
287+
288+
case <-time.After(c.timeout):
289+
c.mu.Lock()
290+
defer c.mu.Unlock()
291+
292+
if c.state != syncStateResending {
293+
return
294+
}
295+
296+
c.log.Tracef("Completing sync after awaitedACK timeout")
297+
298+
c.resetUnsafe()
299+
}
300+
}

0 commit comments

Comments
 (0)