diff --git a/hermes/src/config.rs b/hermes/src/config.rs index 905ef6682c..ec0068109f 100644 --- a/hermes/src/config.rs +++ b/hermes/src/config.rs @@ -24,13 +24,22 @@ pub enum Options { #[structopt(long)] id_secp256k1: Option, - /// Multiaddress for a Wormhole bootstrap peer. - #[structopt(long)] - wormhole_peer: Option, + /// Network ID for Wormhole + #[structopt(long, env = "WORMHOLE_NETWORK_ID")] + wh_network_id: String, - /// Multiaddress to bind Wormhole P2P to. - #[structopt(long)] - wormhole_addr: Option, + /// Multiaddresses for Wormhole bootstrap peers (separated by comma). + #[structopt(long, use_delimiter = true, env = "WORMHOLE_BOOTSTRAP_ADDRS")] + wh_bootstrap_addrs: Vec, + + /// Multiaddresses to bind Wormhole P2P to (separated by comma) + #[structopt( + long, + use_delimiter = true, + default_value = "/ip4/0.0.0.0/udp/30910/quic,/ip6/::/udp/30910/quic", + env = "WORMHOLE_LISTEN_ADDRS" + )] + wh_listen_addrs: Vec, /// The address to bind the RPC server to. #[structopt(long, default_value = "127.0.0.1:33999")] diff --git a/hermes/src/main.rs b/hermes/src/main.rs index 84095618c3..af88990690 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -51,8 +51,9 @@ async fn init(_update_channel: Receiver) -> Result<()> { config::Options::Run { id: _, id_secp256k1: _, - wormhole_addr: _, - wormhole_peer: _, + wh_network_id, + wh_bootstrap_addrs, + wh_listen_addrs, rpc_addr, p2p_addr, p2p_peer: _, @@ -61,7 +62,13 @@ async fn init(_update_channel: Receiver) -> Result<()> { // Spawn the P2P layer. log::info!("Starting P2P server on {}", p2p_addr); - network::p2p::spawn(handle_message).await?; + network::p2p::spawn( + handle_message, + wh_network_id.to_string(), + wh_bootstrap_addrs, + wh_listen_addrs, + ) + .await?; // Spawn the RPC server. log::info!("Starting RPC server on {}", rpc_addr); diff --git a/hermes/src/network/p2p.go b/hermes/src/network/p2p.go index 0cb19b40eb..3f06d5e4c0 100644 --- a/hermes/src/network/p2p.go +++ b/hermes/src/network/p2p.go @@ -8,6 +8,7 @@ package main // #include +// #include // // // A structure containing Wormhole VAA observations. This must match on both // // the Go and Rust side. @@ -27,6 +28,7 @@ import "C" import ( "context" "fmt" + "strings" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" @@ -45,123 +47,120 @@ import ( ) //export RegisterObservationCallback -func RegisterObservationCallback(f C.callback_t) { - go func() { - ctx := context.Background() - - // Setup base network configuration. - networkID := "/wormhole/mainnet/2" - priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) - bootstrapPeers := []string{ - "/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7", - } - - // Setup libp2p Connection Manager. - mgr, err := connmgr.NewConnManager( - 100, - 400, - connmgr.WithGracePeriod(0), - ) - - if err != nil { - err := fmt.Errorf("Failed to create connection manager: %w", err) - fmt.Println(err) - return - } - - // Setup libp2p Reactor. - h, err := libp2p.New( - libp2p.Identity(priv), - libp2p.ListenAddrStrings( - "/ip4/0.0.0.0/udp/30910/quic", - "/ip6/::/udp/30910/quic", - ), - libp2p.Security(libp2ptls.ID, libp2ptls.New), - libp2p.Transport(libp2pquic.NewTransport), - libp2p.ConnectionManager(mgr), - libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - bootstrappers := make([]peer.AddrInfo, 0) - for _, addr := range bootstrapPeers { - ma, err := multiaddr.NewMultiaddr(addr) - if err != nil { - continue - } - - pi, err := peer.AddrInfoFromP2pAddr(ma) - if err != nil || pi.ID == h.ID() { - continue - } - - bootstrappers = append(bootstrappers, *pi) - } - idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer), - dht.ProtocolPrefix(protocol.ID("/"+networkID)), - dht.BootstrapPeers(bootstrappers...), - ) - return idht, err - }), - ) - - if err != nil { - err := fmt.Errorf("Failed to create libp2p host: %w", err) - fmt.Println(err) - return - } - - topic := fmt.Sprintf("%s/%s", networkID, "broadcast") - ps, err := pubsub.NewGossipSub(ctx, h) - if err != nil { - err := fmt.Errorf("Failed to create Pubsub: %w", err) - fmt.Println(err) - return - } - - th, err := ps.Join(topic) - if err != nil { - err := fmt.Errorf("Failed to join topic: %w", err) - fmt.Println(err) - return - } - - sub, err := th.Subscribe() - if err != nil { - err := fmt.Errorf("Failed to subscribe topic: %w", err) - fmt.Println(err) - return - } - - for { - for { - select { - case <-ctx.Done(): - return - default: - envelope, err := sub.Next(ctx) - if err != nil { - err := fmt.Errorf("Failed to receive Pubsub message: %w", err) - fmt.Println(err) - return - } - - // Definition for GossipMessage is generated by Protobuf, see `p2p.proto`. - var msg GossipMessage - err = proto.Unmarshal(envelope.Data, &msg) - - switch msg.Message.(type) { - case *GossipMessage_SignedObservation: - case *GossipMessage_SignedVaaWithQuorum: - vaaBytes := msg.GetSignedVaaWithQuorum().GetVaa() - cBytes := C.CBytes(vaaBytes) - defer C.free(cBytes) - C.invoke(f, C.observation_t{ - vaa: (*C.char)(cBytes), - vaa_len: C.size_t(len(vaaBytes)), - }) - } - } - } - } - }() +func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, listen_addrs *C.char) { + networkID := C.GoString(network_id) + bootstrapAddrs := strings.Split(C.GoString(bootstrap_addrs), ",") + listenAddrs := strings.Split(C.GoString(listen_addrs), ",") + + go func() { + ctx := context.Background() + + // Setup base network configuration. + priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) + + // Setup libp2p Connection Manager. + mgr, err := connmgr.NewConnManager( + 100, + 400, + connmgr.WithGracePeriod(0), + ) + + if err != nil { + err := fmt.Errorf("Failed to create connection manager: %w", err) + fmt.Println(err) + return + } + + // Setup libp2p Reactor. + h, err := libp2p.New( + libp2p.Identity(priv), + libp2p.ListenAddrStrings(listenAddrs...), + libp2p.Security(libp2ptls.ID, libp2ptls.New), + libp2p.Transport(libp2pquic.NewTransport), + libp2p.ConnectionManager(mgr), + libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { + bootstrappers := make([]peer.AddrInfo, 0) + for _, addr := range bootstrapAddrs { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + continue + } + + pi, err := peer.AddrInfoFromP2pAddr(ma) + if err != nil || pi.ID == h.ID() { + continue + } + + bootstrappers = append(bootstrappers, *pi) + } + idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer), + dht.ProtocolPrefix(protocol.ID("/"+networkID)), + dht.BootstrapPeers(bootstrappers...), + ) + return idht, err + }), + ) + + if err != nil { + err := fmt.Errorf("Failed to create libp2p host: %w", err) + fmt.Println(err) + return + } + + topic := fmt.Sprintf("%s/%s", networkID, "broadcast") + ps, err := pubsub.NewGossipSub(ctx, h) + if err != nil { + err := fmt.Errorf("Failed to create Pubsub: %w", err) + fmt.Println(err) + return + } + + th, err := ps.Join(topic) + if err != nil { + err := fmt.Errorf("Failed to join topic: %w", err) + fmt.Println(err) + return + } + + sub, err := th.Subscribe() + if err != nil { + err := fmt.Errorf("Failed to subscribe topic: %w", err) + fmt.Println(err) + return + } + + for { + for { + select { + case <-ctx.Done(): + return + default: + envelope, err := sub.Next(ctx) + if err != nil { + err := fmt.Errorf("Failed to receive Pubsub message: %w", err) + fmt.Println(err) + return + } + + // Definition for GossipMessage is generated by Protobuf, see `p2p.proto`. + var msg GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) + + switch msg.Message.(type) { + case *GossipMessage_SignedObservation: + case *GossipMessage_SignedVaaWithQuorum: + vaaBytes := msg.GetSignedVaaWithQuorum().GetVaa() + cBytes := C.CBytes(vaaBytes) + defer C.free(cBytes) + C.invoke(f, C.observation_t{ + vaa: (*C.char)(cBytes), + vaa_len: C.size_t(len(vaaBytes)), + }) + } + } + } + } + }() } func main() { diff --git a/hermes/src/network/p2p.rs b/hermes/src/network/p2p.rs index ac3fcda437..db16095b84 100644 --- a/hermes/src/network/p2p.rs +++ b/hermes/src/network/p2p.rs @@ -11,17 +11,29 @@ use { anyhow::Result, - std::sync::{ - mpsc::{ - Receiver, - Sender, + libp2p::Multiaddr, + std::{ + ffi::{ + c_char, + CString, + }, + sync::{ + mpsc::{ + Receiver, + Sender, + }, + Mutex, }, - Mutex, }, }; extern "C" { - fn RegisterObservationCallback(cb: extern "C" fn(o: ObservationC)); + fn RegisterObservationCallback( + cb: extern "C" fn(o: ObservationC), + network_id: *const c_char, + bootstrap_addrs: *const c_char, + listen_addrs: *const c_char, + ); } // An `Observation` C type passed back to us from Go. @@ -64,22 +76,58 @@ extern "C" fn proxy(o: ObservationC) { /// TODO: handle_message should be capable of handling more than just Observations. But we don't /// have our own P2P network, we pass it in to keep the code structure and read directly from the /// OBSERVATIONS channel in the RPC for now. -pub fn bootstrap(_handle_message: H) -> Result<()> +pub fn bootstrap( + _handle_message: H, + network_id: String, + wh_bootstrap_addrs: Vec, + wh_listen_addrs: Vec, +) -> Result<()> where H: Fn(Observation) -> Result<()> + 'static, { + let network_id_cstr = CString::new(network_id)?; + let wh_bootstrap_addrs_cstr = CString::new( + wh_bootstrap_addrs + .iter() + .map(|a| a.to_string()) + .collect::>() + .join(","), + )?; + let wh_listen_addrs_cstr = CString::new( + wh_listen_addrs + .iter() + .map(|a| a.to_string()) + .collect::>() + .join(","), + )?; + // Launch the Go LibP2P Reactor. unsafe { - RegisterObservationCallback(proxy as extern "C" fn(o: ObservationC)); + RegisterObservationCallback( + proxy as extern "C" fn(observation: ObservationC), + network_id_cstr.as_ptr(), + wh_bootstrap_addrs_cstr.as_ptr(), + wh_listen_addrs_cstr.as_ptr(), + ); } Ok(()) } // Spawn's the P2P layer as a separate thread via Go. -pub async fn spawn(handle_message: H) -> Result<()> +pub async fn spawn( + handle_message: H, + network_id: String, + wh_bootstrap_addrs: Vec, + wh_listen_addrs: Vec, +) -> Result<()> where H: Fn(Observation) -> Result<()> + Send + 'static, { - bootstrap(handle_message)?; + bootstrap( + handle_message, + network_id, + wh_bootstrap_addrs, + wh_listen_addrs, + )?; Ok(()) }