Skip to content

Commit dbcc05e

Browse files
author
Jaap van der Plas
committed
[ETCM-1053] extend PeerEventBus to support Akka Streams and use it for the Peer case class
1 parent 778bcf8 commit dbcc05e

File tree

8 files changed

+136
-24
lines changed

8 files changed

+136
-24
lines changed

src/main/scala/io/iohk/ethereum/network/Peer.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package io.iohk.ethereum.network
22

33
import java.net.InetSocketAddress
44

5+
import akka.NotUsed
56
import akka.actor.ActorRef
7+
import akka.pattern.Patterns.ask
8+
import akka.stream.scaladsl.Source
69
import akka.util.ByteString
710

811
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId
12+
import io.iohk.ethereum.network.p2p.Message
913

1014
final case class PeerId(value: String) extends BlacklistId
1115

@@ -18,6 +22,7 @@ final case class Peer(
1822
remoteAddress: InetSocketAddress,
1923
ref: ActorRef,
2024
incomingConnection: Boolean,
25+
source: Source[Message, NotUsed] = Source.empty,
2126
nodeId: Option[ByteString] = None,
2227
createTimeMillis: Long = System.currentTimeMillis
2328
)

src/main/scala/io/iohk/ethereum/network/PeerActor.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package io.iohk.ethereum.network
33
import java.net.InetSocketAddress
44
import java.net.URI
55

6+
import akka.NotUsed
67
import akka.actor.SupervisorStrategy.Escalate
78
import akka.actor._
9+
import akka.stream.OverflowStrategy
10+
import akka.stream.scaladsl.Source
811
import akka.util.ByteString
912

1013
import org.bouncycastle.util.encoders.Hex
@@ -21,6 +24,7 @@ import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
2124
import io.iohk.ethereum.network.handshaker.Handshaker.NextMessage
2225
import io.iohk.ethereum.network.p2p._
2326
import io.iohk.ethereum.network.p2p.messages.Capability
27+
import io.iohk.ethereum.network.p2p.messages.Codes
2428
import io.iohk.ethereum.network.p2p.messages.WireProtocol._
2529
import io.iohk.ethereum.network.rlpx.AuthHandshaker
2630
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler
@@ -289,7 +293,17 @@ class PeerActor[R <: HandshakeResult](
289293
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {
290294

291295
val peerId: PeerId = PeerId(Hex.toHexString(remoteNodeId.toArray))
292-
val peer: Peer = Peer(peerId, peerAddress, self, incomingConnection, Some(remoteNodeId))
296+
val source: Source[Message, NotUsed] = PeerEventBusActor
297+
.messageSource(
298+
peerEventBus,
299+
PeerEventBusActor.SubscriptionClassifier
300+
.MessageClassifier(
301+
Set(Codes.BlockBodiesCode, Codes.BlockHeadersCode),
302+
PeerEventBusActor.PeerSelector.WithId(peerId)
303+
)
304+
)
305+
.map(_.message)
306+
val peer: Peer = Peer(peerId, peerAddress, self, incomingConnection, source, Some(remoteNodeId))
293307
peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult))
294308

295309
/** main behavior of actor that handles peer communication and subscriptions for messages

src/main/scala/io/iohk/ethereum/network/PeerEventBusActor.scala

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,55 @@
11
package io.iohk.ethereum.network
22

3+
import akka.NotUsed
34
import akka.actor.Actor
45
import akka.actor.ActorRef
56
import akka.actor.Props
7+
import akka.actor.Terminated
68
import akka.event.ActorEventBus
9+
import akka.stream.OverflowStrategy
10+
import akka.stream.WatchedActorTerminatedException
11+
import akka.stream.scaladsl.Source
12+
import akka.util.Timeout
13+
14+
import scala.concurrent.Future
715

816
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
917
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
1018
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerHandshakeSuccessful
1119
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier._
1220
import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
1321
import io.iohk.ethereum.network.p2p.Message
22+
import io.iohk.ethereum.network.p2p.messages.Codes
1423

1524
object PeerEventBusActor {
1625
def props: Props = Props(new PeerEventBusActor)
1726

27+
/** Handle subscription to the peer event bus via Akka Streams.
28+
*
29+
* @param peerEventBus ref to PeerEventBusActor
30+
* @param messageClassifier specify which messages to subscribe to
31+
* @return Source that subscribes to the peer event bus on materialization
32+
* and unsubscribes on cancellation. It will complete when the event bus
33+
* actor terminates.
34+
*
35+
* Note:
36+
* - subscription is asynchronous so it may miss messages when starting.
37+
* - it does not complete when a specified peerId disconnects.
38+
*/
39+
def messageSource(peerEventBus: ActorRef, messageClassifier: MessageClassifier): Source[MessageFromPeer, NotUsed] =
40+
Source
41+
.fromMaterializer { (mat, _) =>
42+
import mat.executionContext
43+
val (actorRef, src) = Source
44+
.actorRef[MessageFromPeer](1, OverflowStrategy.fail)
45+
.watch(peerEventBus)
46+
.preMaterialize()(mat)
47+
peerEventBus
48+
.tell(Subscribe(messageClassifier), actorRef)
49+
src
50+
}
51+
.mapMaterializedValue(_ => NotUsed)
52+
1853
sealed trait PeerSelector {
1954
def contains(peerId: PeerId): Boolean
2055
}
@@ -28,7 +63,6 @@ object PeerEventBusActor {
2863
case class WithId(peerId: PeerId) extends PeerSelector {
2964
override def contains(p: PeerId): Boolean = p == peerId
3065
}
31-
3266
}
3367

3468
sealed trait SubscriptionClassifier
@@ -196,20 +230,28 @@ object PeerEventBusActor {
196230
case class Unsubscribe(from: Option[SubscriptionClassifier] = None)
197231

198232
case class Publish(ev: PeerEvent)
199-
200233
}
201234

202235
class PeerEventBusActor extends Actor {
203-
204236
import PeerEventBusActor._
205237

206238
val peerEventBus: PeerEventBus = new PeerEventBus
207239

208240
override def receive: Receive = {
209-
case Subscribe(to) => peerEventBus.subscribe(sender(), to)
210-
case Unsubscribe(Some(from)) => peerEventBus.unsubscribe(sender(), from)
211-
case Unsubscribe(None) => peerEventBus.unsubscribe(sender())
212-
case Publish(ev: PeerEvent) => peerEventBus.publish(ev)
213-
}
241+
case Subscribe(to) =>
242+
peerEventBus.subscribe(sender(), to)
243+
context.watch(sender())
244+
245+
case Unsubscribe(Some(from)) =>
246+
peerEventBus.unsubscribe(sender(), from)
247+
248+
case Unsubscribe(None) =>
249+
peerEventBus.unsubscribe(sender())
214250

251+
case Publish(ev: PeerEvent) =>
252+
peerEventBus.publish(ev)
253+
254+
case Terminated(ref) =>
255+
peerEventBus.unsubscribe(ref)
256+
}
215257
}

src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import java.util.Collections.newSetFromMap
66

77
import akka.actor.SupervisorStrategy.Stop
88
import akka.actor._
9+
import akka.stream.scaladsl.Source
910
import akka.util.ByteString
1011
import akka.util.Timeout
1112

@@ -323,9 +324,7 @@ class PeerManagerActor(
323324
PeerId.fromRef(ref),
324325
address,
325326
ref,
326-
incomingConnection,
327-
nodeId = None,
328-
createTimeMillis = System.currentTimeMillis
327+
incomingConnection
329328
)
330329

331330
val newConnectedPeers = connectedPeers.addNewPendingPeer(pendingPeer)

src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ class FastSyncBranchResolverSpec extends AnyWordSpec with Matchers with MockFact
181181
val blocksSavedInPeer: List[Block] =
182182
commonBlocks :++ BlockHelpers.generateChain(ourBestBlock + 1 - highestCommonBlock, commonBlocks.last)
183183

184-
val dummyPeer = Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, None, 0)
184+
val dummyPeer =
185+
Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, createTimeMillis = 0)
185186

186187
val initialSearchState = SearchState(1, 10, dummyPeer)
187188
val ours = blocksSaved.map(b => (b.number, b)).toMap
@@ -256,7 +257,8 @@ class FastSyncBranchResolverSpec extends AnyWordSpec with Matchers with MockFact
256257
val blocksSaved: List[Block] = BlockHelpers.generateChain(8, BlockHelpers.genesis)
257258
val blocksSavedInPeer: List[Block] = BlockHelpers.generateChain(8, BlockHelpers.genesis)
258259

259-
val dummyPeer = Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, None, 0)
260+
val dummyPeer =
261+
Peer(PeerId("dummyPeer"), new InetSocketAddress("foo", 1), ActorRef.noSender, false, createTimeMillis = 0)
260262

261263
val initialSearchState = SearchState(1, 8, dummyPeer)
262264
val ours = blocksSaved.map(b => (b.number, b)).toMap

src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,20 +328,20 @@ class EtcPeerManagerSpec extends AnyFlatSpec with Matchers {
328328

329329
val peer1Probe: TestProbe = TestProbe()
330330
val peer1: Peer =
331-
Peer(PeerId("peer1"), new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false, Some(fakeNodeId))
331+
Peer(PeerId("peer1"), new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false, nodeId = Some(fakeNodeId))
332332
val peer1Info: PeerInfo = initialPeerInfo.withForkAccepted(false)
333333
val peer1InfoETC64: PeerInfo = initialPeerInfoETC64.withForkAccepted(false)
334334
val peer2Probe: TestProbe = TestProbe()
335335
val peer2: Peer =
336-
Peer(PeerId("peer2"), new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false, Some(fakeNodeId))
336+
Peer(PeerId("peer2"), new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false, nodeId = Some(fakeNodeId))
337337
val peer2Info: PeerInfo = initialPeerInfo.withForkAccepted(false)
338338
val peer3Probe: TestProbe = TestProbe()
339339
val peer3: Peer =
340-
Peer(PeerId("peer3"), new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false, Some(fakeNodeId))
340+
Peer(PeerId("peer3"), new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false, nodeId = Some(fakeNodeId))
341341

342342
val freshPeerProbe: TestProbe = TestProbe()
343343
val freshPeer: Peer =
344-
Peer(PeerId(""), new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false, Some(fakeNodeId))
344+
Peer(PeerId(""), new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false, nodeId = Some(fakeNodeId))
345345
val freshPeerInfo: PeerInfo = initialPeerInfo.withForkAccepted(false)
346346

347347
val peerManager: TestProbe = TestProbe()

src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,21 @@ import java.net.InetSocketAddress
44

55
import akka.actor.ActorRef
66
import akka.actor.ActorSystem
7+
import akka.actor.PoisonPill
8+
import akka.stream.WatchedActorTerminatedException
9+
import akka.stream.scaladsl.Flow
10+
import akka.stream.scaladsl.Keep
11+
import akka.stream.scaladsl.Sink
12+
import akka.stream.scaladsl.Source
713
import akka.testkit.TestProbe
814
import akka.util.ByteString
915

16+
import org.scalatest.concurrent.ScalaFutures
1017
import org.scalatest.flatspec.AnyFlatSpec
1118
import org.scalatest.matchers.should.Matchers
1219

1320
import io.iohk.ethereum.Fixtures
21+
import io.iohk.ethereum.NormalPatience
1422
import io.iohk.ethereum.domain.ChainWeight
1523
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
1624
import io.iohk.ethereum.network.EtcPeerManagerActor.RemoteStatus
@@ -19,18 +27,20 @@ import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
1927
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerHandshakeSuccessful
2028
import io.iohk.ethereum.network.PeerEventBusActor.PeerSelector
2129
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier._
30+
import io.iohk.ethereum.network.p2p.Message
2231
import io.iohk.ethereum.network.p2p.messages.Capability
2332
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Ping
2433
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Pong
2534

26-
class PeerEventBusActorSpec extends AnyFlatSpec with Matchers {
35+
class PeerEventBusActorSpec extends AnyFlatSpec with Matchers with ScalaFutures with NormalPatience {
2736

2837
"PeerEventBusActor" should "relay messages received to subscribers" in new TestSetup {
2938

3039
val probe1 = TestProbe()(system)
3140
val probe2 = TestProbe()(system)
3241
val classifier1 = MessageClassifier(Set(Ping.code), PeerSelector.WithId(PeerId("1")))
3342
val classifier2 = MessageClassifier(Set(Ping.code), PeerSelector.AllPeers)
43+
3444
peerEventBusActor.tell(PeerEventBusActor.Subscribe(classifier1), probe1.ref)
3545
peerEventBusActor.tell(PeerEventBusActor.Subscribe(classifier2), probe2.ref)
3646

@@ -46,6 +56,33 @@ class PeerEventBusActorSpec extends AnyFlatSpec with Matchers {
4656
peerEventBusActor ! PeerEventBusActor.Publish(msgFromPeer2)
4757
probe1.expectNoMessage()
4858
probe2.expectMsg(msgFromPeer2)
59+
60+
}
61+
62+
it should "relay messages via streams" in new TestSetup {
63+
val classifier1 = MessageClassifier(Set(Ping.code), PeerSelector.WithId(PeerId("1")))
64+
val classifier2 = MessageClassifier(Set(Ping.code), PeerSelector.AllPeers)
65+
66+
val completeOnTermination =
67+
Flow[MessageFromPeer].recoverWithRetries(1, { case _: WatchedActorTerminatedException => Source.empty })
68+
69+
val stream1 =
70+
PeerEventBusActor.messageSource(peerEventBusActor, classifier1).via(completeOnTermination).runWith(Sink.seq)
71+
val stream2 =
72+
PeerEventBusActor.messageSource(peerEventBusActor, classifier2).via(completeOnTermination).runWith(Sink.seq)
73+
74+
Thread.sleep(100) // stream is not subscribed right away
75+
76+
val msgFromPeer = MessageFromPeer(Ping(), PeerId("1"))
77+
peerEventBusActor ! PeerEventBusActor.Publish(msgFromPeer)
78+
79+
val msgFromPeer2 = MessageFromPeer(Ping(), PeerId("99"))
80+
peerEventBusActor ! PeerEventBusActor.Publish(msgFromPeer2)
81+
82+
peerEventBusActor ! PoisonPill
83+
84+
whenReady(stream1)(_ shouldEqual Seq(msgFromPeer))
85+
whenReady(stream2)(_ shouldEqual Seq(msgFromPeer, msgFromPeer2))
4986
}
5087

5188
it should "only relay matching message codes" in new TestSetup {
@@ -105,7 +142,13 @@ class PeerEventBusActorSpec extends AnyFlatSpec with Matchers {
105142
peerEventBusActor.tell(PeerEventBusActor.Subscribe(PeerHandshaked), probe2.ref)
106143

107144
val peerHandshaked =
108-
new Peer(PeerId("peer1"), new InetSocketAddress("127.0.0.1", 0), TestProbe().ref, false, Some(ByteString()))
145+
new Peer(
146+
PeerId("peer1"),
147+
new InetSocketAddress("127.0.0.1", 0),
148+
TestProbe().ref,
149+
false,
150+
nodeId = Some(ByteString())
151+
)
109152
val msgPeerHandshaked = PeerHandshakeSuccessful(peerHandshaked, initialPeerInfo)
110153
peerEventBusActor ! PeerEventBusActor.Publish(msgPeerHandshaked)
111154

src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ class PeerManagerSpec
195195

196196
// It should have created the next peer for the first incoming connection (probably using a synchronous test scheduler).
197197
val probe2: TestProbe = createdPeers(2).probe
198-
val peer = Peer(PeerId("peer"), incomingPeerAddress1, probe2.ref, incomingConnection = true, Some(incomingNodeId1))
198+
val peer =
199+
Peer(PeerId("peer"), incomingPeerAddress1, probe2.ref, incomingConnection = true, nodeId = Some(incomingNodeId1))
199200
probe2.expectMsg(PeerActor.HandleConnection(incomingConnection1.ref, incomingPeerAddress1))
200201
probe2.reply(PeerEvent.PeerHandshakeSuccessful(peer, initialPeerInfo))
201202

@@ -213,7 +214,13 @@ class PeerManagerSpec
213214
val probe3: TestProbe = createdPeers(3).probe
214215

215216
val secondPeer =
216-
Peer(PeerId("secondPeer"), incomingPeerAddress2, probe3.ref, incomingConnection = true, Some(incomingNodeId2))
217+
Peer(
218+
PeerId("secondPeer"),
219+
incomingPeerAddress2,
220+
probe3.ref,
221+
incomingConnection = true,
222+
nodeId = Some(incomingNodeId2)
223+
)
217224

218225
probe3.expectMsg(PeerActor.HandleConnection(incomingConnection2.ref, incomingPeerAddress2))
219226
probe3.reply(PeerEvent.PeerHandshakeSuccessful(secondPeer, initialPeerInfo))
@@ -287,7 +294,7 @@ class PeerManagerSpec
287294
peerAsIncomingAddress,
288295
peerAsIncomingProbe.ref,
289296
incomingConnection = true,
290-
Some(nodeId)
297+
nodeId = Some(nodeId)
291298
)
292299

293300
peerAsIncomingProbe.expectMsg(
@@ -322,7 +329,7 @@ class PeerManagerSpec
322329
peerAsIncomingAddress,
323330
peerAsIncomingProbe.ref,
324331
incomingConnection = true,
325-
Some(nodeId)
332+
nodeId = Some(nodeId)
326333
)
327334

328335
peerAsIncomingProbe.expectMsg(

0 commit comments

Comments
 (0)