Skip to content

[icxtunnel] Add support for icx usermode tunnels #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/apoxy-dev/apoxy

go 1.24.0

toolchain go1.24.2
go 1.24.3

require (
github.com/ClickHouse/clickhouse-go/v2 v2.23.2
Expand All @@ -11,6 +9,7 @@ require (
github.com/adrg/xdg v0.5.3
github.com/alphadose/haxmap v1.4.1
github.com/anatol/vmtest v0.0.0-20250318022921-2f32244e2f0f
github.com/apoxy-dev/icx v0.5.0
github.com/avast/retry-go/v4 v4.6.1
github.com/bramvdbogaerde/go-scp v1.5.0
github.com/buraksezer/olric v0.5.6
Expand Down Expand Up @@ -46,6 +45,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-discover v0.0.0-20240726212017-342faf50e5d4
github.com/jedib0t/go-pretty/v6 v6.4.9
github.com/julienschmidt/httprouter v1.3.0
github.com/k3s-io/kine v0.13.2
github.com/kdomanski/iso9660 v0.4.0
github.com/klauspost/cpuid/v2 v2.2.10
Expand All @@ -66,8 +66,8 @@ require (
github.com/temporalio/cli v0.12.0
github.com/tetratelabs/wazero v1.7.3
github.com/things-go/go-socks5 v0.0.5
github.com/vishvananda/netlink v1.3.0
github.com/vishvananda/netns v0.0.4
github.com/vishvananda/netlink v1.3.1
github.com/vishvananda/netns v0.0.5
github.com/vmihailenco/msgpack v4.0.4+incompatible
github.com/yosida95/uritemplate/v3 v3.0.2
go.opentelemetry.io/proto/otlp v1.7.0
Expand All @@ -84,12 +84,13 @@ require (
google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gvisor.dev/gvisor v0.0.0-20250314001526-eeca54973f8b
gvisor.dev/gvisor v0.0.0-20250606001031-fa4c4dd86b43
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/apiserver v0.31.1
k8s.io/client-go v0.31.1
k8s.io/klog/v2 v2.130.1
k8s.io/kube-aggregator v0.31.1
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff
k8s.io/kubernetes v1.31.1
k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e
Expand Down Expand Up @@ -144,7 +145,7 @@ require (
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/checkpoint-restore/go-criu/v6 v6.3.0 // indirect
github.com/cilium/ebpf v0.16.0 // indirect
github.com/cilium/ebpf v0.18.0 // indirect
github.com/containerd/console v1.0.4 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/log v0.1.0 // indirect
Expand Down Expand Up @@ -303,11 +304,13 @@ require (
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/safchain/ethtool v0.6.1 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/seccomp/libseccomp-golang v0.10.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shengdoushi/base58 v1.0.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/slavc/xdp v0.3.4 // indirect
github.com/softlayer/softlayer-go v0.0.0-20180806151055-260589d94c7d // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.6 // indirect
Expand Down Expand Up @@ -395,7 +398,6 @@ require (
k8s.io/component-base v0.31.1 // indirect
k8s.io/component-helpers v0.31.1 // indirect
k8s.io/kms v0.33.2 // indirect
k8s.io/kube-aggregator v0.31.1 // indirect
lukechampine.com/uint128 v1.3.0 // indirect
modernc.org/cc/v3 v3.41.0 // indirect
modernc.org/ccgo/v3 v3.17.0 // indirect
Expand Down
56 changes: 26 additions & 30 deletions go.sum

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion pkg/apiserver/controllers/tunnelnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,23 @@ func (r *TunnelNodeReconciler) isNewTokenNeeded(
return true, nil
}

claims, err := r.validator.Validate(credentials.Token, subj)
claims, err := r.validator.Validate(credentials.Token)
if err != nil { // Not supposed to happen so log the issue
log.Error(err, "Token validation failed")
return true, nil
}

tokenSubj, err := claims.GetSubject()
if err != nil {
log.Error(err, "Failed to get subject from token claims")
return true, nil
}

if tokenSubj != subj {
log.Info("Token subject does not match", "expected", subj, "got", tokenSubj)
return true, nil
}

exp, err := claims.GetExpirationTime()
if err != nil {
log.Error(err, "Failed to get expiration time")
Expand Down
79 changes: 79 additions & 0 deletions pkg/tunnel/bifurcate/bifurcate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package bifurcate

import (
"net"
"sync"

"github.com/apoxy-dev/icx/geneve"
)

type packet struct {
buf []byte
addr net.Addr
}

var packetPool = sync.Pool{
New: func() any {
buf := make([]byte, 65535)
return &packet{
buf: buf,
addr: nil,
}
},
}

// Bifurcate splits incoming packets from `pc` into geneve and other channels.
func Bifurcate(pc net.PacketConn) (net.PacketConn, net.PacketConn) {
geneveConn := newChanPacketConn(pc)
otherConn := newChanPacketConn(pc)

go func() {
for {
// Get a reusable packet from the pool
p := packetPool.Get().(*packet)
p.buf = p.buf[:cap(p.buf)] // reset buffer to full capacity

n, addr, err := pc.ReadFrom(p.buf)
if err != nil {
packetPool.Put(p)
_ = geneveConn.Close()
_ = otherConn.Close()
return
}

p.addr = addr
p.buf = p.buf[:n] // trim to actual size

var targetChan chan *packet
if isGeneve(p.buf) {
targetChan = geneveConn.ch
} else {
targetChan = otherConn.ch
}

select {
case targetChan <- p:
case <-geneveConn.closed:
packetPool.Put(p)
return
case <-otherConn.closed:
packetPool.Put(p)
return
}
}
}()

return geneveConn, otherConn
}

func isGeneve(b []byte) bool {
var hdr geneve.Header
_, err := hdr.UnmarshalBinary(b)
if err != nil {
return false
}

// TODO: validate Geneve header fields if necessary

return true
}
127 changes: 127 additions & 0 deletions pkg/tunnel/bifurcate/bifurcate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package bifurcate_test

import (
"bytes"
"errors"
"net"
"testing"
"time"

"github.com/apoxy-dev/icx/geneve"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/apoxy-dev/apoxy/pkg/tunnel/bifurcate"
)

type MockPacketConn struct {
mock.Mock
readQueue chan readResult
addr net.Addr
closed bool
}

type readResult struct {
data []byte
addr net.Addr
err error
}

func NewMockPacketConn() *MockPacketConn {
return &MockPacketConn{
readQueue: make(chan readResult, 10),
addr: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 12345},
}
}

func (m *MockPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
result, ok := <-m.readQueue
if !ok {
return 0, nil, errors.New("mock read closed")
}
n := copy(p, result.data)
return n, result.addr, result.err
}

func (m *MockPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
args := m.Called(p, addr)
return args.Int(0), args.Error(1)
}

func (m *MockPacketConn) Close() error {
m.closed = true
close(m.readQueue)
return nil
}

func (m *MockPacketConn) LocalAddr() net.Addr { return m.addr }
func (m *MockPacketConn) SetDeadline(t time.Time) error { return nil }
func (m *MockPacketConn) SetReadDeadline(t time.Time) error { return nil }
func (m *MockPacketConn) SetWriteDeadline(t time.Time) error { return nil }

// --- Helpers ---

func createGenevePacket(t *testing.T) []byte {
header := geneve.Header{
Version: 0,
ProtocolType: 0x6558,
VNI: 0x123456,
NumOptions: 0,
}
buf := make([]byte, 128)
n, err := header.MarshalBinary(buf)
require.NoError(t, err)
return buf[:n]
}

func createNonGenevePacket() []byte {
return []byte("this is not a geneve packet")
}

func TestBifurcate(t *testing.T) {
t.Run("routes geneve and non-geneve packets to correct connections", func(t *testing.T) {
mockConn := NewMockPacketConn()
remote := &net.UDPAddr{IP: net.IPv4(10, 1, 1, 1), Port: 9999}

// Prepare packets
genevePkt := createGenevePacket(t)
nonGenevePkt := createNonGenevePacket()

mockConn.readQueue <- readResult{data: genevePkt, addr: remote}
mockConn.readQueue <- readResult{data: nonGenevePkt, addr: remote}

geneveConn, otherConn := bifurcate.Bifurcate(mockConn)

// Read from geneveConn
buf := make([]byte, 1024)
n, addr, err := geneveConn.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, remote.String(), addr.String())
require.True(t, bytes.HasPrefix(buf[:n], genevePkt))

// Read from otherConn
n, addr, err = otherConn.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, remote.String(), addr.String())
require.Equal(t, string(buf[:n]), string(nonGenevePkt))
})

t.Run("closes both connections on read error", func(t *testing.T) {
mockConn := NewMockPacketConn()
// simulate read error by closing channel
close(mockConn.readQueue)

geneveConn, otherConn := bifurcate.Bifurcate(mockConn)

// wait for goroutine to detect closure
time.Sleep(50 * time.Millisecond)

buf := make([]byte, 1024)

_, _, err := geneveConn.ReadFrom(buf)
require.ErrorIs(t, err, net.ErrClosed)

_, _, err = otherConn.ReadFrom(buf)
require.ErrorIs(t, err, net.ErrClosed)
})
}
61 changes: 61 additions & 0 deletions pkg/tunnel/bifurcate/chanpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package bifurcate

import (
"net"
"time"
)

type chanPacketConn struct {
pc net.PacketConn // underlying connection
ch chan *packet // incoming packets
closed chan struct{}
}

func newChanPacketConn(pc net.PacketConn) *chanPacketConn {
return &chanPacketConn{
ch: make(chan *packet, 1024),
pc: pc,
closed: make(chan struct{}),
}
}

func (c *chanPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case pkt := <-c.ch:
defer packetPool.Put(pkt) // return packet to pool
n = copy(p, pkt.buf)
return n, pkt.addr, nil
case <-c.closed:
return 0, nil, net.ErrClosed
}
}

func (c *chanPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
return c.pc.WriteTo(p, addr)
}

func (c *chanPacketConn) Close() error {
select {
case <-c.closed:
return nil
default:
close(c.closed)
return nil
}
}

func (c *chanPacketConn) LocalAddr() net.Addr {
return c.pc.LocalAddr()
}

func (c *chanPacketConn) SetDeadline(t time.Time) error {
return c.pc.SetDeadline(t)
}

func (c *chanPacketConn) SetReadDeadline(t time.Time) error {
return c.pc.SetReadDeadline(t)
}

func (c *chanPacketConn) SetWriteDeadline(t time.Time) error {
return c.pc.SetWriteDeadline(t)
}
Loading