From 352d75ddb8cc6e970a944d5fd692c3dd41387a17 Mon Sep 17 00:00:00 2001 From: Thomaz Leite Date: Tue, 18 Apr 2023 14:25:48 -0300 Subject: [PATCH 1/4] Format p2p.go --- hermes/src/network/p2p.go | 233 +++++++++++++++++++------------------- 1 file changed, 117 insertions(+), 116 deletions(-) diff --git a/hermes/src/network/p2p.go b/hermes/src/network/p2p.go index 0cb19b40eb..60e814560c 100644 --- a/hermes/src/network/p2p.go +++ b/hermes/src/network/p2p.go @@ -46,122 +46,123 @@ import ( //export RegisterObservationCallback func RegisterObservationCallback(f C.callback_t) { - go func() { - ctx := context.Background() - - // Setup base network configuration. - networkID := "/wormhole/mainnet/2" - priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) - bootstrapPeers := []string{ - "/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7", - } - - // Setup libp2p Connection Manager. - mgr, err := connmgr.NewConnManager( - 100, - 400, - connmgr.WithGracePeriod(0), - ) - - if err != nil { - err := fmt.Errorf("Failed to create connection manager: %w", err) - fmt.Println(err) - return - } - - // Setup libp2p Reactor. - h, err := libp2p.New( - libp2p.Identity(priv), - libp2p.ListenAddrStrings( - "/ip4/0.0.0.0/udp/30910/quic", - "/ip6/::/udp/30910/quic", - ), - libp2p.Security(libp2ptls.ID, libp2ptls.New), - libp2p.Transport(libp2pquic.NewTransport), - libp2p.ConnectionManager(mgr), - libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - bootstrappers := make([]peer.AddrInfo, 0) - for _, addr := range bootstrapPeers { - ma, err := multiaddr.NewMultiaddr(addr) - if err != nil { - continue - } - - pi, err := peer.AddrInfoFromP2pAddr(ma) - if err != nil || pi.ID == h.ID() { - continue - } - - bootstrappers = append(bootstrappers, *pi) - } - idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer), - dht.ProtocolPrefix(protocol.ID("/"+networkID)), - dht.BootstrapPeers(bootstrappers...), - ) - return idht, err - }), - ) - - if err != nil { - err := fmt.Errorf("Failed to create libp2p host: %w", err) - fmt.Println(err) - return - } - - topic := fmt.Sprintf("%s/%s", networkID, "broadcast") - ps, err := pubsub.NewGossipSub(ctx, h) - if err != nil { - err := fmt.Errorf("Failed to create Pubsub: %w", err) - fmt.Println(err) - return - } - - th, err := ps.Join(topic) - if err != nil { - err := fmt.Errorf("Failed to join topic: %w", err) - fmt.Println(err) - return - } - - sub, err := th.Subscribe() - if err != nil { - err := fmt.Errorf("Failed to subscribe topic: %w", err) - fmt.Println(err) - return - } - - for { - for { - select { - case <-ctx.Done(): - return - default: - envelope, err := sub.Next(ctx) - if err != nil { - err := fmt.Errorf("Failed to receive Pubsub message: %w", err) - fmt.Println(err) - return - } - - // Definition for GossipMessage is generated by Protobuf, see `p2p.proto`. - var msg GossipMessage - err = proto.Unmarshal(envelope.Data, &msg) - - switch msg.Message.(type) { - case *GossipMessage_SignedObservation: - case *GossipMessage_SignedVaaWithQuorum: - vaaBytes := msg.GetSignedVaaWithQuorum().GetVaa() - cBytes := C.CBytes(vaaBytes) - defer C.free(cBytes) - C.invoke(f, C.observation_t{ - vaa: (*C.char)(cBytes), - vaa_len: C.size_t(len(vaaBytes)), - }) - } - } - } - } - }() + go func() { + ctx := context.Background() + + // Setup base network configuration. + networkID := "/wormhole/testnet/2/1" + priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) + bootstrapPeers := []string{ + //"/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7", + "/dns4/wormhole-testnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWAkB9ynDur1Jtoa97LBUp8RXdhzS5uHgAfdTquJbrbN7i", + } + + // Setup libp2p Connection Manager. + mgr, err := connmgr.NewConnManager( + 100, + 400, + connmgr.WithGracePeriod(0), + ) + + if err != nil { + err := fmt.Errorf("Failed to create connection manager: %w", err) + fmt.Println(err) + return + } + + // Setup libp2p Reactor. + h, err := libp2p.New( + libp2p.Identity(priv), + libp2p.ListenAddrStrings( + "/ip4/0.0.0.0/udp/30910/quic", + "/ip6/::/udp/30910/quic", + ), + libp2p.Security(libp2ptls.ID, libp2ptls.New), + libp2p.Transport(libp2pquic.NewTransport), + libp2p.ConnectionManager(mgr), + libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { + bootstrappers := make([]peer.AddrInfo, 0) + for _, addr := range bootstrapPeers { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + continue + } + + pi, err := peer.AddrInfoFromP2pAddr(ma) + if err != nil || pi.ID == h.ID() { + continue + } + + bootstrappers = append(bootstrappers, *pi) + } + idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer), + dht.ProtocolPrefix(protocol.ID("/"+networkID)), + dht.BootstrapPeers(bootstrappers...), + ) + return idht, err + }), + ) + + if err != nil { + err := fmt.Errorf("Failed to create libp2p host: %w", err) + fmt.Println(err) + return + } + + topic := fmt.Sprintf("%s/%s", networkID, "broadcast") + ps, err := pubsub.NewGossipSub(ctx, h) + if err != nil { + err := fmt.Errorf("Failed to create Pubsub: %w", err) + fmt.Println(err) + return + } + + th, err := ps.Join(topic) + if err != nil { + err := fmt.Errorf("Failed to join topic: %w", err) + fmt.Println(err) + return + } + + sub, err := th.Subscribe() + if err != nil { + err := fmt.Errorf("Failed to subscribe topic: %w", err) + fmt.Println(err) + return + } + + for { + for { + select { + case <-ctx.Done(): + return + default: + envelope, err := sub.Next(ctx) + if err != nil { + err := fmt.Errorf("Failed to receive Pubsub message: %w", err) + fmt.Println(err) + return + } + + // Definition for GossipMessage is generated by Protobuf, see `p2p.proto`. + var msg GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) + + switch msg.Message.(type) { + case *GossipMessage_SignedObservation: + case *GossipMessage_SignedVaaWithQuorum: + vaaBytes := msg.GetSignedVaaWithQuorum().GetVaa() + cBytes := C.CBytes(vaaBytes) + defer C.free(cBytes) + C.invoke(f, C.observation_t{ + vaa: (*C.char)(cBytes), + vaa_len: C.size_t(len(vaaBytes)), + }) + } + } + } + } + }() } func main() { From e63bdfd96edc0751e077659026c6ed620a571112 Mon Sep 17 00:00:00 2001 From: Thomaz Leite Date: Tue, 18 Apr 2023 16:29:45 -0300 Subject: [PATCH 2/4] Pass Wormhole arguments from command line or env. vars --- hermes/src/config.rs | 22 +++++++++----- hermes/src/main.rs | 13 ++++++-- hermes/src/network/p2p.go | 20 ++++++------- hermes/src/network/p2p.rs | 62 ++++++++++++++++++++++++++++++++------- 4 files changed, 86 insertions(+), 31 deletions(-) diff --git a/hermes/src/config.rs b/hermes/src/config.rs index 905ef6682c..6fa3d35f3e 100644 --- a/hermes/src/config.rs +++ b/hermes/src/config.rs @@ -24,13 +24,21 @@ pub enum Options { #[structopt(long)] id_secp256k1: Option, - /// Multiaddress for a Wormhole bootstrap peer. - #[structopt(long)] - wormhole_peer: Option, - - /// Multiaddress to bind Wormhole P2P to. - #[structopt(long)] - wormhole_addr: Option, + /// Network ID for Wormhole + #[structopt(long, env = "WORMHOLE_NETWORK_ID")] + wh_network_id: String, + + /// Multiaddresses for Wormhole bootstrap peers (separated by comma). + #[structopt(long, env = "WORMHOLE_BOOTSTRAP_ADDRS")] + wh_bootstrap_addrs: String, + + /// Multiaddresses to bind Wormhole P2P to (separated by comma) + #[structopt( + long, + default_value = "/ip4/0.0.0.0/udp/30910/quic,/ip6/::/udp/30910/quic", + env = "WORMHOLE_LISTEN_ADDRS" + )] + wh_listen_addrs: String, /// The address to bind the RPC server to. #[structopt(long, default_value = "127.0.0.1:33999")] diff --git a/hermes/src/main.rs b/hermes/src/main.rs index 84095618c3..c6faa8942b 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -51,8 +51,9 @@ async fn init(_update_channel: Receiver) -> Result<()> { config::Options::Run { id: _, id_secp256k1: _, - wormhole_addr: _, - wormhole_peer: _, + wh_network_id, + wh_bootstrap_addrs, + wh_listen_addrs, rpc_addr, p2p_addr, p2p_peer: _, @@ -61,7 +62,13 @@ async fn init(_update_channel: Receiver) -> Result<()> { // Spawn the P2P layer. log::info!("Starting P2P server on {}", p2p_addr); - network::p2p::spawn(handle_message).await?; + network::p2p::spawn( + handle_message, + wh_network_id.to_string(), + wh_bootstrap_addrs.to_string(), + wh_listen_addrs.to_string(), + ) + .await?; // Spawn the RPC server. log::info!("Starting RPC server on {}", rpc_addr); diff --git a/hermes/src/network/p2p.go b/hermes/src/network/p2p.go index 60e814560c..5895930a57 100644 --- a/hermes/src/network/p2p.go +++ b/hermes/src/network/p2p.go @@ -8,6 +8,7 @@ package main // #include +// #include // // // A structure containing Wormhole VAA observations. This must match on both // // the Go and Rust side. @@ -27,6 +28,7 @@ import "C" import ( "context" "fmt" + "strings" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" @@ -45,17 +47,16 @@ import ( ) //export RegisterObservationCallback -func RegisterObservationCallback(f C.callback_t) { +func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, listen_addrs *C.char) { go func() { ctx := context.Background() + networkID := C.GoString(network_id) + bootstrapAddrs := strings.Split(C.GoString(bootstrap_addrs), ",") + listenAddrs := strings.Split(C.GoString(listen_addrs), ",") + // Setup base network configuration. - networkID := "/wormhole/testnet/2/1" priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) - bootstrapPeers := []string{ - //"/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7", - "/dns4/wormhole-testnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWAkB9ynDur1Jtoa97LBUp8RXdhzS5uHgAfdTquJbrbN7i", - } // Setup libp2p Connection Manager. mgr, err := connmgr.NewConnManager( @@ -73,16 +74,13 @@ func RegisterObservationCallback(f C.callback_t) { // Setup libp2p Reactor. h, err := libp2p.New( libp2p.Identity(priv), - libp2p.ListenAddrStrings( - "/ip4/0.0.0.0/udp/30910/quic", - "/ip6/::/udp/30910/quic", - ), + libp2p.ListenAddrStrings(listenAddrs...), libp2p.Security(libp2ptls.ID, libp2ptls.New), libp2p.Transport(libp2pquic.NewTransport), libp2p.ConnectionManager(mgr), libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { bootstrappers := make([]peer.AddrInfo, 0) - for _, addr := range bootstrapPeers { + for _, addr := range bootstrapAddrs { ma, err := multiaddr.NewMultiaddr(addr) if err != nil { continue diff --git a/hermes/src/network/p2p.rs b/hermes/src/network/p2p.rs index ac3fcda437..5f341aa939 100644 --- a/hermes/src/network/p2p.rs +++ b/hermes/src/network/p2p.rs @@ -11,17 +11,28 @@ use { anyhow::Result, - std::sync::{ - mpsc::{ - Receiver, - Sender, + std::{ + ffi::{ + c_char, + CString, + }, + sync::{ + mpsc::{ + Receiver, + Sender, + }, + Mutex, }, - Mutex, }, }; extern "C" { - fn RegisterObservationCallback(cb: extern "C" fn(o: ObservationC)); + fn RegisterObservationCallback( + cb: extern "C" fn(o: ObservationC), + network_id: *const c_char, + bootstrap_addrs: *const c_char, + listen_addrs: *const c_char, + ); } // An `Observation` C type passed back to us from Go. @@ -64,22 +75,53 @@ extern "C" fn proxy(o: ObservationC) { /// TODO: handle_message should be capable of handling more than just Observations. But we don't /// have our own P2P network, we pass it in to keep the code structure and read directly from the /// OBSERVATIONS channel in the RPC for now. -pub fn bootstrap(_handle_message: H) -> Result<()> +pub fn bootstrap( + _handle_message: H, + network_id: String, + wh_bootstrap_addrs: String, + wh_listen_addrs: String, +) -> Result<()> where H: Fn(Observation) -> Result<()> + 'static, { + log::warn!("Network ID: {:?}", network_id); + let c_network_id = CString::new(network_id)?; + let c_wh_bootstrap_addrs = CString::new(wh_bootstrap_addrs)?; + let c_wh_listen_addrs = CString::new(wh_listen_addrs)?; + // Launch the Go LibP2P Reactor. unsafe { - RegisterObservationCallback(proxy as extern "C" fn(o: ObservationC)); + RegisterObservationCallback( + proxy as extern "C" fn(observation: ObservationC), + c_network_id.as_ptr(), + c_wh_bootstrap_addrs.as_ptr(), + c_wh_listen_addrs.as_ptr(), + ); + + // The memory will be freed when the Go function finishes using the + // pointers since C.GoString creates a copy of the strings. + std::mem::forget(c_network_id); + std::mem::forget(c_wh_bootstrap_addrs); + std::mem::forget(c_wh_listen_addrs); } Ok(()) } // Spawn's the P2P layer as a separate thread via Go. -pub async fn spawn(handle_message: H) -> Result<()> +pub async fn spawn( + handle_message: H, + network_id: String, + wh_bootstrap_addrs: String, + wh_listen_addrs: String, +) -> Result<()> where H: Fn(Observation) -> Result<()> + Send + 'static, { - bootstrap(handle_message)?; + bootstrap( + handle_message, + network_id, + wh_bootstrap_addrs, + wh_listen_addrs, + )?; Ok(()) } From 384f3895fb67791db0e1c1541bcf6f9aea93fb2b Mon Sep 17 00:00:00 2001 From: Thomaz Leite Date: Tue, 18 Apr 2023 17:27:32 -0300 Subject: [PATCH 3/4] Remove forget calls and let memory be freed (also remove confusing comment) --- hermes/src/network/p2p.go | 8 ++++---- hermes/src/network/p2p.rs | 7 ------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/hermes/src/network/p2p.go b/hermes/src/network/p2p.go index 5895930a57..3f06d5e4c0 100644 --- a/hermes/src/network/p2p.go +++ b/hermes/src/network/p2p.go @@ -48,13 +48,13 @@ import ( //export RegisterObservationCallback func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, listen_addrs *C.char) { + networkID := C.GoString(network_id) + bootstrapAddrs := strings.Split(C.GoString(bootstrap_addrs), ",") + listenAddrs := strings.Split(C.GoString(listen_addrs), ",") + go func() { ctx := context.Background() - networkID := C.GoString(network_id) - bootstrapAddrs := strings.Split(C.GoString(bootstrap_addrs), ",") - listenAddrs := strings.Split(C.GoString(listen_addrs), ",") - // Setup base network configuration. priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) diff --git a/hermes/src/network/p2p.rs b/hermes/src/network/p2p.rs index 5f341aa939..8cd4c457bd 100644 --- a/hermes/src/network/p2p.rs +++ b/hermes/src/network/p2p.rs @@ -84,7 +84,6 @@ pub fn bootstrap( where H: Fn(Observation) -> Result<()> + 'static, { - log::warn!("Network ID: {:?}", network_id); let c_network_id = CString::new(network_id)?; let c_wh_bootstrap_addrs = CString::new(wh_bootstrap_addrs)?; let c_wh_listen_addrs = CString::new(wh_listen_addrs)?; @@ -97,12 +96,6 @@ where c_wh_bootstrap_addrs.as_ptr(), c_wh_listen_addrs.as_ptr(), ); - - // The memory will be freed when the Go function finishes using the - // pointers since C.GoString creates a copy of the strings. - std::mem::forget(c_network_id); - std::mem::forget(c_wh_bootstrap_addrs); - std::mem::forget(c_wh_listen_addrs); } Ok(()) } From 4dc74d5b19c16defb5d695dd50d6ba75b21b9974 Mon Sep 17 00:00:00 2001 From: Thomaz Leite Date: Wed, 19 Apr 2023 07:58:18 -0300 Subject: [PATCH 4/4] Use proper types on command line arguments --- hermes/src/config.rs | 7 ++++--- hermes/src/main.rs | 4 ++-- hermes/src/network/p2p.rs | 33 +++++++++++++++++++++++---------- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/hermes/src/config.rs b/hermes/src/config.rs index 6fa3d35f3e..ec0068109f 100644 --- a/hermes/src/config.rs +++ b/hermes/src/config.rs @@ -29,16 +29,17 @@ pub enum Options { wh_network_id: String, /// Multiaddresses for Wormhole bootstrap peers (separated by comma). - #[structopt(long, env = "WORMHOLE_BOOTSTRAP_ADDRS")] - wh_bootstrap_addrs: String, + #[structopt(long, use_delimiter = true, env = "WORMHOLE_BOOTSTRAP_ADDRS")] + wh_bootstrap_addrs: Vec, /// Multiaddresses to bind Wormhole P2P to (separated by comma) #[structopt( long, + use_delimiter = true, default_value = "/ip4/0.0.0.0/udp/30910/quic,/ip6/::/udp/30910/quic", env = "WORMHOLE_LISTEN_ADDRS" )] - wh_listen_addrs: String, + wh_listen_addrs: Vec, /// The address to bind the RPC server to. #[structopt(long, default_value = "127.0.0.1:33999")] diff --git a/hermes/src/main.rs b/hermes/src/main.rs index c6faa8942b..af88990690 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -65,8 +65,8 @@ async fn init(_update_channel: Receiver) -> Result<()> { network::p2p::spawn( handle_message, wh_network_id.to_string(), - wh_bootstrap_addrs.to_string(), - wh_listen_addrs.to_string(), + wh_bootstrap_addrs, + wh_listen_addrs, ) .await?; diff --git a/hermes/src/network/p2p.rs b/hermes/src/network/p2p.rs index 8cd4c457bd..db16095b84 100644 --- a/hermes/src/network/p2p.rs +++ b/hermes/src/network/p2p.rs @@ -11,6 +11,7 @@ use { anyhow::Result, + libp2p::Multiaddr, std::{ ffi::{ c_char, @@ -78,23 +79,35 @@ extern "C" fn proxy(o: ObservationC) { pub fn bootstrap( _handle_message: H, network_id: String, - wh_bootstrap_addrs: String, - wh_listen_addrs: String, + wh_bootstrap_addrs: Vec, + wh_listen_addrs: Vec, ) -> Result<()> where H: Fn(Observation) -> Result<()> + 'static, { - let c_network_id = CString::new(network_id)?; - let c_wh_bootstrap_addrs = CString::new(wh_bootstrap_addrs)?; - let c_wh_listen_addrs = CString::new(wh_listen_addrs)?; + let network_id_cstr = CString::new(network_id)?; + let wh_bootstrap_addrs_cstr = CString::new( + wh_bootstrap_addrs + .iter() + .map(|a| a.to_string()) + .collect::>() + .join(","), + )?; + let wh_listen_addrs_cstr = CString::new( + wh_listen_addrs + .iter() + .map(|a| a.to_string()) + .collect::>() + .join(","), + )?; // Launch the Go LibP2P Reactor. unsafe { RegisterObservationCallback( proxy as extern "C" fn(observation: ObservationC), - c_network_id.as_ptr(), - c_wh_bootstrap_addrs.as_ptr(), - c_wh_listen_addrs.as_ptr(), + network_id_cstr.as_ptr(), + wh_bootstrap_addrs_cstr.as_ptr(), + wh_listen_addrs_cstr.as_ptr(), ); } Ok(()) @@ -104,8 +117,8 @@ where pub async fn spawn( handle_message: H, network_id: String, - wh_bootstrap_addrs: String, - wh_listen_addrs: String, + wh_bootstrap_addrs: Vec, + wh_listen_addrs: Vec, ) -> Result<()> where H: Fn(Observation) -> Result<()> + Send + 'static,