|
| 1 | +package io.iohk.ethereum.network |
| 2 | + |
| 3 | +import io.iohk.ethereum.utils.Logger |
| 4 | +import java.net.InetAddress |
| 5 | +import java.util.concurrent.ExecutorService |
| 6 | +import monix.eval.Task |
| 7 | +import org.jupnp.DefaultUpnpServiceConfiguration |
| 8 | +import org.jupnp.support.igd.PortMappingListener |
| 9 | +import org.jupnp.support.model.PortMapping |
| 10 | +import org.jupnp.support.model.PortMapping.Protocol.{TCP, UDP} |
| 11 | +import org.jupnp.tool.transport.JDKTransportConfiguration |
| 12 | +import org.jupnp.transport.Router |
| 13 | +import org.jupnp.transport.spi.NetworkAddressFactory |
| 14 | +import org.jupnp.transport.spi.StreamClient |
| 15 | +import org.jupnp.transport.spi.StreamClientConfiguration |
| 16 | +import org.jupnp.transport.spi.StreamServer |
| 17 | +import org.jupnp.transport.spi.StreamServerConfiguration |
| 18 | +import org.jupnp.UpnpServiceImpl |
| 19 | +import scala.jdk.CollectionConverters._ |
| 20 | +import scala.util.chaining._ |
| 21 | +import org.jupnp.QueueingThreadPoolExecutor |
| 22 | +import cats.effect.Resource |
| 23 | +import org.jupnp.UpnpService |
| 24 | +import cats.implicits._ |
| 25 | + |
| 26 | +private class ClientOnlyUpnpServiceConfiguration extends DefaultUpnpServiceConfiguration() { |
| 27 | + private final val THREAD_POOL_SIZE = 4 // seemingly the minimum required to perform port mapping |
| 28 | + |
| 29 | + override def createDefaultExecutorService(): ExecutorService = |
| 30 | + QueueingThreadPoolExecutor.createInstance("mantis-jupnp", THREAD_POOL_SIZE); |
| 31 | + |
| 32 | + override def createStreamClient(): StreamClient[_ <: StreamClientConfiguration] = |
| 33 | + JDKTransportConfiguration.INSTANCE.createStreamClient(getSyncProtocolExecutorService()) |
| 34 | + |
| 35 | + override def createStreamServer(networkAddressFactory: NetworkAddressFactory): NoStreamServer.type = |
| 36 | + NoStreamServer // prevent a StreamServer from running needlessly |
| 37 | +} |
| 38 | + |
| 39 | +private object NoStreamServer extends StreamServer[StreamServerConfiguration] { |
| 40 | + def run(): Unit = () |
| 41 | + def init(_1: InetAddress, _2: Router): Unit = () |
| 42 | + def getPort(): Int = 0 |
| 43 | + def stop(): Unit = () |
| 44 | + def getConfiguration(): StreamServerConfiguration = new StreamServerConfiguration { |
| 45 | + def getListenPort(): Int = 0 |
| 46 | + } |
| 47 | +} |
| 48 | + |
| 49 | +object PortForwarder extends Logger { |
| 50 | + private final val description = "Mantis" |
| 51 | + |
| 52 | + def openPorts(tcpPorts: Seq[Int], udpPorts: Seq[Int]): Resource[Task, Unit] = |
| 53 | + Resource.make(startForwarding(tcpPorts, udpPorts))(stopForwarding).void |
| 54 | + |
| 55 | + private def startForwarding(tcpPorts: Seq[Int], udpPorts: Seq[Int]): Task[UpnpService] = Task { |
| 56 | + log.info("Attempting port forwarding for TCP ports {} and UDP ports {}", tcpPorts, udpPorts) |
| 57 | + new UpnpServiceImpl(new ClientOnlyUpnpServiceConfiguration()).tap { service => |
| 58 | + service.startup() |
| 59 | + |
| 60 | + val bindAddresses = |
| 61 | + service |
| 62 | + .getConfiguration() |
| 63 | + .createNetworkAddressFactory() |
| 64 | + .getBindAddresses() |
| 65 | + .asScala |
| 66 | + .map(_.getHostAddress()) |
| 67 | + .toArray |
| 68 | + |
| 69 | + val portMappings = for { |
| 70 | + address <- bindAddresses |
| 71 | + (port, protocol) <- tcpPorts.map(_ -> TCP) ++ udpPorts.map(_ -> UDP) |
| 72 | + } yield new PortMapping(port, address, protocol).tap(_.setDescription(description)) |
| 73 | + |
| 74 | + service.getRegistry().addListener(new PortMappingListener(portMappings)) |
| 75 | + } |
| 76 | + } |
| 77 | + |
| 78 | + private def stopForwarding(service: UpnpService) = Task { |
| 79 | + service.shutdown() |
| 80 | + } |
| 81 | +} |
0 commit comments