@@ -20,8 +20,8 @@ use crate::validators::{
20
20
MismatchedRackIdError , ReconfigurationError , ValidatedReconfigureMsg ,
21
21
} ;
22
22
use crate :: {
23
- Alarm , Configuration , CoordinatorState , Epoch , NodeHandlerCtx , PlatformId ,
24
- messages:: * ,
23
+ Alarm , Configuration , CoordinatorState , Epoch , ExpungedMetadata ,
24
+ NodeHandlerCtx , PlatformId , messages:: * ,
25
25
} ;
26
26
use daft:: { Diffable , Leaf } ;
27
27
use gfss:: shamir:: Share ;
@@ -243,13 +243,26 @@ impl Node {
243
243
from : PlatformId ,
244
244
msg : PeerMsg ,
245
245
) {
246
+ if ctx. persistent_state ( ) . is_expunged ( ) {
247
+ warn ! (
248
+ self . log,
249
+ "Received message while expunged. Dropping." ;
250
+ "from" => %from,
251
+ "msg" => msg. kind. name( )
252
+ ) ;
253
+ return ;
254
+ }
255
+
246
256
if let Some ( rack_id) = ctx. persistent_state ( ) . rack_id ( ) {
247
257
if rack_id != msg. rack_id {
248
- error ! ( self . log, "Mismatched rack id" ;
249
- "from" => %from,
250
- "msg" => msg. kind. name( ) ,
251
- "expected" => %rack_id,
252
- "got" => %msg. rack_id) ;
258
+ error ! (
259
+ self . log,
260
+ "Mismatched rack id" ;
261
+ "from" => %from,
262
+ "msg" => msg. kind. name( ) ,
263
+ "expected" => %rack_id,
264
+ "got" => %msg. rack_id
265
+ ) ;
253
266
return ;
254
267
}
255
268
}
@@ -269,6 +282,9 @@ impl Node {
269
282
PeerMsgKind :: CommitAdvance ( config) => {
270
283
self . handle_commit_advance ( ctx, from, config)
271
284
}
285
+ PeerMsgKind :: Expunged ( epoch) => {
286
+ self . handle_expunged ( ctx, from, epoch) ;
287
+ }
272
288
_ => todo ! (
273
289
"cannot handle message variant yet - not implemented: {msg:?}"
274
290
) ,
@@ -308,6 +324,81 @@ impl Node {
308
324
}
309
325
}
310
326
327
+ fn handle_expunged (
328
+ & mut self ,
329
+ ctx : & mut impl NodeHandlerCtx ,
330
+ from : PlatformId ,
331
+ epoch : Epoch ,
332
+ ) {
333
+ if let Some ( config) = ctx. persistent_state ( ) . latest_config ( ) {
334
+ if epoch < config. epoch {
335
+ // It's possible, but unlikely, that we were expunged at `epoch`
336
+ // and later re-added to the trust-quorum, but the reply to
337
+ // an old message is still floating in the network. This is
338
+ // especially unlikely since, we should really have restarted
339
+ // sprockets connections in this case. In any event, the race
340
+ // condition exists at the protocol level, and so we handle it.
341
+ if config. members . contains_key ( ctx. platform_id ( ) ) {
342
+ let m = concat ! (
343
+ "Received Expunged message for old epoch. " ,
344
+ "We must have been re-added as a trust-quorum member."
345
+ ) ;
346
+ warn ! (
347
+ self . log,
348
+ "{m}" ;
349
+ "from" => %from,
350
+ "received_epoch" => %epoch,
351
+ "epoch" => %config. epoch
352
+ ) ;
353
+ }
354
+ return ;
355
+ } else if epoch > config. epoch {
356
+ let m = concat ! (
357
+ "Received Expunged message for newer epoch. " ,
358
+ "Recording expungement in persistent state."
359
+ ) ;
360
+ warn ! (
361
+ self . log,
362
+ "{m}" ;
363
+ "from" => %from,
364
+ "received_epoch" => %epoch,
365
+ "epoch" => %config. epoch
366
+ ) ;
367
+ // Intentionally fall through
368
+ } else {
369
+ let m = concat ! (
370
+ "Received Expunged message for latest known epoch. " ,
371
+ "Recording expungement in persistent state."
372
+ ) ;
373
+ warn ! (
374
+ self . log,
375
+ "{m}" ;
376
+ "from" => %from,
377
+ "received_epoch" => %epoch,
378
+ "epoch" => %config. epoch
379
+ ) ;
380
+ // Intentionally fall through
381
+ }
382
+
383
+ // Perform the actual expunge
384
+ ctx. update_persistent_state ( |ps| {
385
+ ps. expunged = Some ( ExpungedMetadata { epoch, from } ) ;
386
+ true
387
+ } ) ;
388
+ } else {
389
+ let m = concat ! (
390
+ "Received Expunge message, but we have no configurations. " ,
391
+ "We must have been factory reset already."
392
+ ) ;
393
+ error ! (
394
+ self . log,
395
+ "{m}" ;
396
+ "from" => %from,
397
+ "received_epoch" => %epoch
398
+ ) ;
399
+ }
400
+ }
401
+
311
402
fn handle_commit_advance (
312
403
& mut self ,
313
404
ctx : & mut impl NodeHandlerCtx ,
@@ -469,7 +560,10 @@ impl Node {
469
560
%latest_committed_config. epoch,
470
561
"requested_epoch" => %epoch
471
562
) ;
472
- // TODO: Send an expunged message
563
+ ctx. send (
564
+ from,
565
+ PeerMsgKind :: Expunged ( latest_committed_config. epoch ) ,
566
+ ) ;
473
567
return ;
474
568
}
475
569
info ! (
@@ -499,7 +593,13 @@ impl Node {
499
593
"from" => %from,
500
594
"epoch" => %epoch
501
595
) ;
502
- // TODO: Send an expunged message
596
+ // Technically, this node does not yet know that the
597
+ // configuration at `epoch` has been committed. However,
598
+ // requesting nodes only ask for key shares when they know that
599
+ // the configuration has been committed. Therefore, rather than
600
+ // introduce a new message such as `NotAMember`, we inform the
601
+ // requesting node that they have been expunged.
602
+ ctx. send ( from, PeerMsgKind :: Expunged ( epoch) ) ;
503
603
return ;
504
604
}
505
605
}
0 commit comments