Distributed WebSocket 是一个支持分布式部署的 WebSocket 服务框架,基于 Go 语言实现。它通过 Redis 存储客户端连接信息,利用 gRPC 在服务节点间传递消息,实现了跨节点的实时通信。
- 支持单节点和分布式部署
- 基于 Redis 的连接信息共享
- 通过 gRPC 实现跨节点消息传递
- 高性能的 WebSocket 连接处理
- 自动连接管理和消息广播
- 客户端连接状态监控和错误处理
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ WebSocket 客户端 │────▶│ WebSocket 服务节点 │◀───▶│ Redis │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ ▲
▼ │
┌──────────────────┐
│ gRPC │
│ 服务接口 │
└──────────────────┘
系统主要组件:
- WebSocket 客户端 - 浏览器或其他设备建立 WebSocket 连接
- WebSocket 服务节点 - 承载 WebSocket 连接,处理消息收发
- Redis - 存储连接的元信息(connection_id ↔ 节点IP+端口)
- gRPC 服务 - 节点间通过 gRPC 互相发送消息
go get github.com/jayecc/go-websocket
- Go 1.19+
- Redis
- gRPC
package main
import (
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
websocket "github.com/jayecc/go-websocket"
)
func main() {
gin.SetMode(gin.ReleaseMode)
gin.DisableConsoleColor()
app := gin.Default()
// 创建WebSocket Hub
hub := websocket.NewHubRun()
defer hub.Close()
// 注册WebSocket路由
app.GET("/ws", func(ctx *gin.Context) {
client := websocket.NewClient(hub, websocket.WithId("xxxx"))
// 设置连接回调
client.OnConnect(func(conn *websocket.Client) {
log.Printf("Client %s connected", conn.GetID())
})
// 设置消息处理回调
client.OnEvent(func(conn *websocket.Client, messageType int, message []byte) {
log.Printf("Received message from client %s: %s", conn.GetID(), string(message))
// 发送服务器时间作为响应
response := time.Now().Format(time.RFC3339)
conn.Emit([]byte(response))
})
// 设置断开连接回调
client.OnDisconnect(func(id string) {
log.Printf("Client %s disconnected", id)
})
// 设置错误处理回调
client.OnError(func(id string, err error) {
log.Printf("Error from client %s: %v", id, err)
})
// 建立WebSocket连接
if err := client.Conn(ctx.Writer, ctx.Request); err != nil {
log.Printf("Failed to establish WebSocket connection: %v", err)
ctx.String(http.StatusInternalServerError, "Failed to establish WebSocket connection")
return
}
})
log.Fatal(app.Run(":8080"))
}
package main
import (
"context"
"log"
"net"
"net/http"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis/v8"
"github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
websocket "github.com/jayecc/go-websocket"
"github.com/jayecc/go-websocket/websocketpb"
)
func main() {
serverGroup := errgroup.Group{}
grpcAddr := ":8081"
httpAddr := ":8082"
grpcHost := websocket.IP().String() + grpcAddr
// 创建Redis客户端
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
// 创建Redis存储
storage := websocket.NewRedisStorage(redisClient, "websocket")
// 创建WebSocket Hub
websocketHub := websocket.NewHubRun()
defer websocketHub.Close()
// 创建分布式WebSocket客户端
websocketClient := websocket.NewDistClient(storage)
// 启动 gRPC 服务器
serverGroup.Go(func() error {
lis, err := net.Listen("tcp", grpcAddr)
if err != nil {
return err
}
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(grpcrecovery.UnaryServerInterceptor()),
)
// 注册 gRPC 服务
websocketpb.RegisterWebsocketServer(grpcServer, websocket.NewDistServer(websocketHub))
return grpcServer.Serve(lis)
})
// 启动 HTTP 服务器
serverGroup.Go(func() error {
gin.SetMode(gin.ReleaseMode)
gin.DisableConsoleColor()
app := gin.Default()
// 注册WebSocket路由
app.GET("/ws", func(ctx *gin.Context) {
session := websocket.NewDistSession(websocketHub, storage, grpcHost, websocket.WithId("xxxx"))
session.OnError(func(id string, err error) {
log.Printf("OnError: %v\n", err)
})
session.OnEvent(func(conn *websocket.Client, messageType int, message []byte) {
log.Printf("OnEvent: %s\n", string(message))
// 广播消息到所有节点
log.Println(websocketClient.Broadcast(context.Background(), []byte("grpc广播消息")))
})
session.OnConnect(func(conn *websocket.Client) {
log.Printf("OnConnect: %s\n", conn.GetID())
})
session.OnDisconnect(func(id string) {
log.Printf("OnDisconnect: %s\n", id)
})
if err := session.Conn(ctx.Writer, ctx.Request); err != nil {
ctx.String(http.StatusInternalServerError, "Failed to establish WebSocket connection")
return
}
})
return app.Run(httpAddr)
})
log.Println(serverGroup.Wait())
}
管理活跃的客户端连接和消息广播。
NewHub()
- 创建 Hub 实例NewHubRun()
- 创建并运行 Hub 实例Client(id string)
- 根据 ID 获取客户端Broadcast(message []byte)
- 广播消息Close()
- 关闭 Hub
表示单个 WebSocket 客户端连接。
NewClient(hub *Hub, opts ...Option)
- 创建客户端Conn(w http.ResponseWriter, r *http.Request)
- 建立 WebSocket 连接Emit(message []byte)
- 向客户端发送消息Broadcast(message []byte)
- 广播消息GetID()
- 获取客户端 IDClose()
- 关闭客户端连接
分布式会话管理器。
NewDistSession(hub *Hub, storage Storage, addr string, opts ...Option)
- 创建分布式会话OnConnect(handler func(conn *Client))
- 设置连接回调OnEvent(handler func(conn *Client, messageType int, message []byte))
- 设置消息回调OnError(handler func(id string, err error))
- 设置错误回调OnDisconnect(handler func(id string))
- 设置断开连接回调
分布式客户端,用于跨节点发送消息。
NewDistClient(storage Storage)
- 创建分布式客户端Emit(ctx context.Context, id string, message []byte)
- 向指定客户端发送消息Online(ctx context.Context, id string)
- 检查客户端是否在线Broadcast(ctx context.Context, message []byte)
- 广播消息到所有节点
存储接口,用于保存客户端连接信息。
Set(key string, value string)
- 设置键值对Get(key string)
- 获取值Del(key ...string)
- 删除键Clear(host string)
- 清理指定主机的连接All()
- 获取所有连接信息
项目提供了 gRPC 服务接口,用于节点间通信:
Emit
- 向指定客户端发送消息Online
- 检查客户端是否在线Broadcast
- 广播消息
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
storage := websocket.NewRedisStorage(redisClient, "websocket")
- 错误处理:始终实现错误回调以监控连接问题
- 资源清理:使用
defer
确保 Hub 和连接正确关闭 - 超时控制:为 gRPC 调用设置合适的超时时间
- 重试机制:对于关键操作实现重试逻辑
- 监控日志:记录连接状态和消息处理日志
MIT