diff --git a/.gitignore b/.gitignore index e56afa2..5348dd2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea -data +data/** +!data/.gitkeep diff --git a/README.md b/README.md index 4bae505..455528b 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,9 @@ Then start the docker container via ## Usage -You can publish messages with http POST on /publish message must be sent in plaintext +You can publish messages with http POST on /{topic}/publish message must be sent in plaintext -To receive messages you can http GET on /subscribe or connect via websocket to /ws +To receive messages you can http GET on /{topic}/subscribe or connect via websocket to /{topic}/ws On the websocket you need to send "next" in plaintext to get a new message ## License diff --git a/compose.development.yml b/compose.development.yml index 6c83694..e7947a5 100644 --- a/compose.development.yml +++ b/compose.development.yml @@ -9,6 +9,7 @@ services: - "3002:3000" volumes: - ./src:/app/src + - ./data:/app/data - ./internal:/app/internal # - ./pkg:/app/pkg - ./.air.toml:/app/.air.toml @@ -25,6 +26,7 @@ services: - "2345:2345" volumes: - ./src:/app/src + - ./data:/app/data - ./internal:/app/internal # - ./pkg:/app/pkg - ./.air.debug.toml:/app/.air.toml diff --git a/compose.yml b/compose.yml index 38f25d3..fb74525 100644 --- a/compose.yml +++ b/compose.yml @@ -4,3 +4,5 @@ services: target: Production container_name: application restart: always + volumes: + - ./data:/app/data diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/internal/http.go b/internal/http.go index ccc12c9..adc0ca0 100644 --- a/internal/http.go +++ b/internal/http.go @@ -1,29 +1,35 @@ package internal -import "github.com/gofiber/fiber/v2" +import ( + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/utils" +) type HTTP struct { - app *fiber.App - queue Queue[string] + app *fiber.App + topicManager *TopicManager } -func NewHTTP(app *fiber.App, queue Queue[string]) *HTTP { - return &HTTP{app: app, queue: queue} +func NewHTTP(app *fiber.App, topicManager *TopicManager) *HTTP { + return &HTTP{app: app, topicManager: topicManager} } func (H *HTTP) SetupRoutes() { - H.app.Post("/publish", func(c *fiber.Ctx) error { - H.queue.Enqueue(string(c.Body())) + H.app.Post("/:topic/publish", func(c *fiber.Ctx) error { + topic := Topic(c.Params("topic")) + body := utils.CopyBytes(c.Body()) + H.topicManager.AddMessage(topic, body) return c.SendStatus(fiber.StatusOK) }) - H.app.Get("/subscribe", func(c *fiber.Ctx) error { - element, err := H.queue.Dequeue() + H.app.Get("/:topic/subscribe", func(c *fiber.Ctx) error { + topic := Topic(c.Params("topic")) + message, err := H.topicManager.GetNextMessage(topic) if err != nil { - return c.SendString("") + return c.Send([]byte{}) } - return c.SendString(element) + return c.Send(message) }) } diff --git a/internal/queue.go b/internal/queue.go index ce6b7ee..0901925 100644 --- a/internal/queue.go +++ b/internal/queue.go @@ -5,17 +5,17 @@ import ( "github.com/gofiber/contrib/websocket" ) -type Queue[T string | *websocket.Conn] interface { +type Queue[T string | *websocket.Conn | Message | []byte] interface { IsEmpty() bool Enqueue(T) Dequeue() (T, error) } -type SliceQueue[T string | *websocket.Conn] struct { +type SliceQueue[T string | *websocket.Conn | Message | []byte] struct { data []T } -func NewSliceQueue[T string | *websocket.Conn]() *SliceQueue[T] { +func NewSliceQueue[T string | *websocket.Conn | Message | []byte]() *SliceQueue[T] { return &SliceQueue[T]{data: nil} } @@ -37,21 +37,21 @@ func (S *SliceQueue[T]) Dequeue() (T, error) { return element, nil } -type LinkedList[T string | *websocket.Conn] struct { +type LinkedList[T string | *websocket.Conn | Message | []byte] struct { head *Node[T] tail *Node[T] } -type Node[T string | *websocket.Conn] struct { +type Node[T string | *websocket.Conn | Message | []byte] struct { data T next *Node[T] } -type LinkedListQueue[T string | *websocket.Conn] struct { +type LinkedListQueue[T string | *websocket.Conn | Message | []byte] struct { data *LinkedList[T] } -func NewLinkedListQueue[T string | *websocket.Conn]() *LinkedListQueue[T] { +func NewLinkedListQueue[T string | *websocket.Conn | Message | []byte]() *LinkedListQueue[T] { return &LinkedListQueue[T]{data: &LinkedList[T]{head: nil, tail: nil}} } diff --git a/internal/storage.go b/internal/storage.go index e4b934d..ab0cd89 100644 --- a/internal/storage.go +++ b/internal/storage.go @@ -5,54 +5,103 @@ import ( "os" ) +const DataDir = "./data/" +const FileExcluded = ".gitkeep" + type Storage struct { - queue Queue[string] + topicManager *TopicManager } -func NewStorage(queue Queue[string]) *Storage { +func NewStorage(topicManager *TopicManager) *Storage { return &Storage{ - queue: queue, + topicManager: topicManager, + } +} + +func (S *Storage) createStoreDirectory() { + _, err := os.ReadDir(DataDir) + if err != nil { + if os.IsNotExist(err) { + err := os.Mkdir(DataDir, 0755) + if err != nil { + panic(err) + } + } else if !os.IsExist(err) { + panic(err) + } } } func (S *Storage) clear() { - err := os.WriteFile("./data", []byte(""), 0644) + dirEntries, err := os.ReadDir(DataDir) if err != nil { panic(err) } + for _, entry := range dirEntries { + if entry.Name() == FileExcluded { + continue + } + os.Remove(DataDir + entry.Name()) + } } func (S *Storage) Save() { S.clear() + S.createStoreDirectory() + + for _, topic := range S.topicManager.GetTopics() { + S.saveTopic(topic) + } +} - file, err := os.OpenFile("./data", os.O_RDWR|os.O_CREATE, 0644) +func (S *Storage) saveTopic(topic Topic) { + file, err := os.OpenFile(string(DataDir+topic), os.O_RDWR|os.O_CREATE, 0644) if err != nil { panic(err) } defer file.Close() - for !S.queue.IsEmpty() { - value, _ := S.queue.Dequeue() - //err := os.OpenFile("./data", []byte(value+"\n"), 0644) - _, err = file.WriteString(value + "\n") + for { + message, err := S.topicManager.GetNextMessage(topic) + if err != nil { + return + } + _, err = file.Write(message) if err != nil { panic(err) } + // Line feed + _, _ = file.Write([]byte{10}) } } func (S *Storage) Load() { - file, err := os.OpenFile("./data", os.O_RDONLY, 0644) + dirEntries, err := os.ReadDir(DataDir) + if err != nil { + panic(err) + } + + for _, dirEntry := range dirEntries { + if dirEntry.Name() == FileExcluded { + continue + } + S.loadTopic(dirEntry) + } +} + +func (S *Storage) loadTopic(dirEntry os.DirEntry) { + topic := dirEntry.Name() + file, err := os.OpenFile(DataDir+topic, os.O_RDONLY, 0644) if err != nil { - return + panic(err) } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { - text := scanner.Text() - if text == "" { + text := scanner.Bytes() + if len(text) <= 0 { continue } - S.queue.Enqueue(text) + S.topicManager.AddMessage(Topic(topic), text) } } diff --git a/internal/system.go b/internal/system.go index f3c42d9..b1a4969 100644 --- a/internal/system.go +++ b/internal/system.go @@ -4,11 +4,12 @@ import ( "github.com/gofiber/fiber/v2/log" "os" "os/signal" + "syscall" ) func HandleOsSignal(storage *Storage) { sigchan := make(chan os.Signal, 1) - signals := []os.Signal{os.Kill, os.Interrupt} + signals := []os.Signal{os.Kill, os.Interrupt, syscall.SIGTERM} signal.Notify(sigchan, signals...) <-sigchan diff --git a/internal/topic.go b/internal/topic.go new file mode 100644 index 0000000..abbefbb --- /dev/null +++ b/internal/topic.go @@ -0,0 +1,107 @@ +package internal + +import ( + "errors" + "github.com/gofiber/contrib/websocket" +) + +type TopicManager struct { + messageQueue map[Topic]Queue[Message] + connectionQueue map[Topic]Queue[*websocket.Conn] + connections map[Topic]map[*websocket.Conn]bool + topics []Topic +} + +type Topic string +type Message []byte + +func NewTopicManager() *TopicManager { + return &TopicManager{ + messageQueue: make(map[Topic]Queue[Message]), + connectionQueue: make(map[Topic]Queue[*websocket.Conn]), + connections: map[Topic]map[*websocket.Conn]bool{}, + } +} + +func (T *TopicManager) GetNextMessage(topic Topic) (Message, error) { + if _, ok := T.messageQueue[topic]; !ok { + return nil, errors.New("topic not found") + } + + message, err := T.messageQueue[topic].Dequeue() + if err != nil { + return nil, err + } + + return message, nil +} + +func (T *TopicManager) AddMessage(topic Topic, message Message) { + if _, ok := T.messageQueue[topic]; !ok { + T.messageQueue[topic] = NewLinkedListQueue[Message]() + T.topics = append(T.topics, topic) + } + T.messageQueue[topic].Enqueue(message) +} + +func (T *TopicManager) AddConnection(topic Topic, conn *websocket.Conn) { + if _, ok := T.connections[topic]; !ok { + T.connections[topic] = map[*websocket.Conn]bool{conn: true} + } else { + T.connections[topic][conn] = true + } +} + +func (T *TopicManager) RemoveConnection(topic Topic, conn *websocket.Conn) { + if _, ok := T.connections[topic]; !ok { + return + } + delete(T.connections[topic], conn) +} + +func (T *TopicManager) GetTopics() []Topic { + return T.topics +} + +func (T *TopicManager) GetNextConnection(topic Topic) (*websocket.Conn, error) { + if T.topicNotReady(topic) { + return nil, errors.New("topic not found or no data") + } + + connection, err := T.connectionQueue[topic].Dequeue() + if err != nil { + return nil, err + } + return connection, nil +} + +func (T *TopicManager) ConnectionExists(topic Topic, connection *websocket.Conn) bool { + if _, ok := T.connections[topic]; !ok { + return false + } + if _, ok := T.connections[topic][connection]; !ok { + return false + } + + return true +} + +func (T *TopicManager) AddConnectionToQueue(topic Topic, connection *websocket.Conn) { + if _, ok := T.connectionQueue[topic]; !ok { + T.connectionQueue[topic] = NewLinkedListQueue[*websocket.Conn]() + } + T.connectionQueue[topic].Enqueue(connection) +} + +func (T *TopicManager) topicNotReady(topic Topic) bool { + if _, ok := T.connections[topic]; !ok { + return true + } + if _, ok := T.connectionQueue[topic]; !ok { + return true + } + if _, ok := T.messageQueue[topic]; !ok { + return true + } + return T.messageQueue[topic].IsEmpty() || T.connectionQueue[topic].IsEmpty() || len(T.connections[topic]) == 0 +} diff --git a/internal/websocket.go b/internal/websocket.go index ab694b1..ce903ef 100644 --- a/internal/websocket.go +++ b/internal/websocket.go @@ -8,26 +8,26 @@ import ( ) type WebSocket struct { - app *fiber.App - messageQueue Queue[string] - connectionQueue Queue[*websocket.Conn] - connectionPool map[*websocket.Conn]bool - registerPool chan *websocket.Conn - unregisterPool chan *websocket.Conn + app *fiber.App + registerPool chan *couple + unregisterPool chan *couple + topicManager *TopicManager } -func NewWebSocket(app *fiber.App, messageQueue Queue[string], connectionQueue Queue[*websocket.Conn]) *WebSocket { - connectionPool := make(map[*websocket.Conn]bool) - registerPool := make(chan *websocket.Conn, 10) - unregisterPool := make(chan *websocket.Conn, 10) +type couple struct { + Connection *websocket.Conn + Topic Topic +} + +func NewWebSocket(app *fiber.App, topicManager *TopicManager) *WebSocket { + registerPool := make(chan *couple, 10) + unregisterPool := make(chan *couple, 10) return &WebSocket{ - app: app, - messageQueue: messageQueue, - connectionQueue: connectionQueue, - connectionPool: connectionPool, - registerPool: registerPool, - unregisterPool: unregisterPool, + app: app, + registerPool: registerPool, + unregisterPool: unregisterPool, + topicManager: topicManager, } } @@ -36,15 +36,20 @@ func (W *WebSocket) SetupRoutes() { go W.sendMessagesToClients() go W.handleConnectionRegisterUnregister() - W.app.Get("/ws", websocket.New(func(c *websocket.Conn) { + W.app.Get("/:topic/ws", websocket.New(func(c *websocket.Conn) { + topic := Topic(c.Params("topic")) + couple := &couple{ + Connection: c, + Topic: topic, + } // unregister connection defer func() { - W.unregisterPool <- c + W.unregisterPool <- couple c.Close() }() // register connection - W.registerPool <- c + W.registerPool <- couple // read messages for { _, message, err := c.ReadMessage() @@ -55,7 +60,7 @@ func (W *WebSocket) SetupRoutes() { return } if string(message) == "next" { - W.connectionQueue.Enqueue(c) + W.topicManager.AddConnectionToQueue(topic, c) } } }, websocket.Config{RecoverHandler: GetWebsocketPanicHandler()})) @@ -65,25 +70,27 @@ func (W *WebSocket) sendMessagesToClients() { defer RecoverGoroutine(W.sendMessagesToClients) for { runtime.Gosched() - if len(W.connectionPool) == 0 || W.connectionQueue.IsEmpty() || W.messageQueue.IsEmpty() { - continue - } - connection, err := W.connectionQueue.Dequeue() - if _, ok := W.connectionPool[connection]; ok == false || err != nil { - continue - } + for _, topic := range W.topicManager.GetTopics() { + connection, err := W.topicManager.GetNextConnection(topic) + if err != nil { + continue + } + exists := W.topicManager.ConnectionExists(topic, connection) + if exists == false { + continue + } - message, err := W.messageQueue.Dequeue() - if err != nil { - W.connectionQueue.Enqueue(connection) - continue - } + message, err := W.topicManager.GetNextMessage(topic) + if err != nil { + W.topicManager.AddConnectionToQueue(topic, connection) + continue + } - err = connection.WriteMessage(websocket.TextMessage, []byte(message)) - if err != nil { - W.connectionQueue.Enqueue(connection) - W.messageQueue.Enqueue(message) - return + err = connection.WriteMessage(websocket.TextMessage, message) + if err != nil { + W.topicManager.AddConnectionToQueue(topic, connection) + W.topicManager.AddMessage(topic, message) + } } } } @@ -92,12 +99,12 @@ func (W *WebSocket) handleConnectionRegisterUnregister() { defer RecoverGoroutine(W.handleConnectionRegisterUnregister) for { select { - case connection := <-W.registerPool: - W.connectionPool[connection] = true + case foo := <-W.registerPool: + W.topicManager.AddConnection(foo.Topic, foo.Connection) log.Debug("connection registered") - case connection := <-W.unregisterPool: - delete(W.connectionPool, connection) + case foo := <-W.unregisterPool: + W.topicManager.RemoveConnection(foo.Topic, foo.Connection) log.Debug("connection unregistered") } } diff --git a/src/main.go b/src/main.go index 8694629..f622335 100644 --- a/src/main.go +++ b/src/main.go @@ -2,7 +2,6 @@ package main import ( "github.com/ThomasBoom89/simple-http-message-queue/internal" - websocket2 "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/logger" @@ -10,9 +9,8 @@ import ( ) func main() { - - queue := internal.NewLinkedListQueue[string]() - storage := internal.NewStorage(queue) + topicManager := internal.NewTopicManager() + storage := internal.NewStorage(topicManager) storage.Load() defer internal.SaveOnPanic(storage) go internal.HandleOsSignal(storage) @@ -28,11 +26,10 @@ func main() { app.Use(logger.New()) app.Use(cors.New()) - http := internal.NewHTTP(app, queue) + http := internal.NewHTTP(app, topicManager) http.SetupRoutes() - queue2 := internal.NewLinkedListQueue[*websocket2.Conn]() - websocket := internal.NewWebSocket(app, queue, queue2) + websocket := internal.NewWebSocket(app, topicManager) websocket.SetupRoutes() err := app.Listen(":3000")