Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea
data
data/**
!data/.gitkeep
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Then start the docker container via

## Usage

You can publish messages with http POST on /publish message must be sent in plaintext
You can publish messages with http POST on /{topic}/publish message must be sent in plaintext

To receive messages you can http GET on /subscribe or connect via websocket to /ws
To receive messages you can http GET on /{topic}/subscribe or connect via websocket to /{topic}/ws
On the websocket you need to send "next" in plaintext to get a new message

## License
Expand Down
2 changes: 2 additions & 0 deletions compose.development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
- "3002:3000"
volumes:
- ./src:/app/src
- ./data:/app/data
- ./internal:/app/internal
# - ./pkg:/app/pkg
- ./.air.toml:/app/.air.toml
Expand All @@ -25,6 +26,7 @@ services:
- "2345:2345"
volumes:
- ./src:/app/src
- ./data:/app/data
- ./internal:/app/internal
# - ./pkg:/app/pkg
- ./.air.debug.toml:/app/.air.toml
Expand Down
2 changes: 2 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ services:
target: Production
container_name: application
restart: always
volumes:
- ./data:/app/data
Empty file added data/.gitkeep
Empty file.
28 changes: 17 additions & 11 deletions internal/http.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
package internal

import "github.com/gofiber/fiber/v2"
import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/utils"
)

type HTTP struct {
app *fiber.App
queue Queue[string]
app *fiber.App
topicManager *TopicManager
}

func NewHTTP(app *fiber.App, queue Queue[string]) *HTTP {
return &HTTP{app: app, queue: queue}
func NewHTTP(app *fiber.App, topicManager *TopicManager) *HTTP {
return &HTTP{app: app, topicManager: topicManager}
}

func (H *HTTP) SetupRoutes() {
H.app.Post("/publish", func(c *fiber.Ctx) error {
H.queue.Enqueue(string(c.Body()))
H.app.Post("/:topic/publish", func(c *fiber.Ctx) error {
topic := Topic(c.Params("topic"))
body := utils.CopyBytes(c.Body())
H.topicManager.AddMessage(topic, body)

return c.SendStatus(fiber.StatusOK)
})

H.app.Get("/subscribe", func(c *fiber.Ctx) error {
element, err := H.queue.Dequeue()
H.app.Get("/:topic/subscribe", func(c *fiber.Ctx) error {
topic := Topic(c.Params("topic"))
message, err := H.topicManager.GetNextMessage(topic)
if err != nil {
return c.SendString("")
return c.Send([]byte{})
}

return c.SendString(element)
return c.Send(message)
})
}
14 changes: 7 additions & 7 deletions internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"github.com/gofiber/contrib/websocket"
)

type Queue[T string | *websocket.Conn] interface {
type Queue[T string | *websocket.Conn | Message | []byte] interface {
IsEmpty() bool
Enqueue(T)
Dequeue() (T, error)
}

type SliceQueue[T string | *websocket.Conn] struct {
type SliceQueue[T string | *websocket.Conn | Message | []byte] struct {
data []T
}

func NewSliceQueue[T string | *websocket.Conn]() *SliceQueue[T] {
func NewSliceQueue[T string | *websocket.Conn | Message | []byte]() *SliceQueue[T] {
return &SliceQueue[T]{data: nil}
}

Expand All @@ -37,21 +37,21 @@ func (S *SliceQueue[T]) Dequeue() (T, error) {
return element, nil
}

type LinkedList[T string | *websocket.Conn] struct {
type LinkedList[T string | *websocket.Conn | Message | []byte] struct {
head *Node[T]
tail *Node[T]
}

type Node[T string | *websocket.Conn] struct {
type Node[T string | *websocket.Conn | Message | []byte] struct {
data T
next *Node[T]
}

type LinkedListQueue[T string | *websocket.Conn] struct {
type LinkedListQueue[T string | *websocket.Conn | Message | []byte] struct {
data *LinkedList[T]
}

func NewLinkedListQueue[T string | *websocket.Conn]() *LinkedListQueue[T] {
func NewLinkedListQueue[T string | *websocket.Conn | Message | []byte]() *LinkedListQueue[T] {
return &LinkedListQueue[T]{data: &LinkedList[T]{head: nil, tail: nil}}
}

Expand Down
77 changes: 63 additions & 14 deletions internal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,103 @@ import (
"os"
)

const DataDir = "./data/"
const FileExcluded = ".gitkeep"

type Storage struct {
queue Queue[string]
topicManager *TopicManager
}

func NewStorage(queue Queue[string]) *Storage {
func NewStorage(topicManager *TopicManager) *Storage {
return &Storage{
queue: queue,
topicManager: topicManager,
}
}

func (S *Storage) createStoreDirectory() {
_, err := os.ReadDir(DataDir)
if err != nil {
if os.IsNotExist(err) {
err := os.Mkdir(DataDir, 0755)
if err != nil {
panic(err)
}
} else if !os.IsExist(err) {
panic(err)
}
}
}

func (S *Storage) clear() {
err := os.WriteFile("./data", []byte(""), 0644)
dirEntries, err := os.ReadDir(DataDir)
if err != nil {
panic(err)
}
for _, entry := range dirEntries {
if entry.Name() == FileExcluded {
continue
}
os.Remove(DataDir + entry.Name())
}
}

func (S *Storage) Save() {
S.clear()
S.createStoreDirectory()

for _, topic := range S.topicManager.GetTopics() {
S.saveTopic(topic)
}
}

file, err := os.OpenFile("./data", os.O_RDWR|os.O_CREATE, 0644)
func (S *Storage) saveTopic(topic Topic) {
file, err := os.OpenFile(string(DataDir+topic), os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
panic(err)
}
defer file.Close()
for !S.queue.IsEmpty() {
value, _ := S.queue.Dequeue()
//err := os.OpenFile("./data", []byte(value+"\n"), 0644)
_, err = file.WriteString(value + "\n")
for {
message, err := S.topicManager.GetNextMessage(topic)
if err != nil {
return
}
_, err = file.Write(message)
if err != nil {
panic(err)
}
// Line feed
_, _ = file.Write([]byte{10})
}
}

func (S *Storage) Load() {
file, err := os.OpenFile("./data", os.O_RDONLY, 0644)
dirEntries, err := os.ReadDir(DataDir)
if err != nil {
panic(err)
}

for _, dirEntry := range dirEntries {
if dirEntry.Name() == FileExcluded {
continue
}
S.loadTopic(dirEntry)
}
}

func (S *Storage) loadTopic(dirEntry os.DirEntry) {
topic := dirEntry.Name()
file, err := os.OpenFile(DataDir+topic, os.O_RDONLY, 0644)
if err != nil {
return
panic(err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
text := scanner.Text()
if text == "" {
text := scanner.Bytes()
if len(text) <= 0 {
continue
}
S.queue.Enqueue(text)
S.topicManager.AddMessage(Topic(topic), text)
}
}
3 changes: 2 additions & 1 deletion internal/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"github.com/gofiber/fiber/v2/log"
"os"
"os/signal"
"syscall"
)

func HandleOsSignal(storage *Storage) {
sigchan := make(chan os.Signal, 1)
signals := []os.Signal{os.Kill, os.Interrupt}
signals := []os.Signal{os.Kill, os.Interrupt, syscall.SIGTERM}
signal.Notify(sigchan, signals...)
<-sigchan

Expand Down
107 changes: 107 additions & 0 deletions internal/topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package internal

import (
"errors"
"github.com/gofiber/contrib/websocket"
)

type TopicManager struct {
messageQueue map[Topic]Queue[Message]
connectionQueue map[Topic]Queue[*websocket.Conn]
connections map[Topic]map[*websocket.Conn]bool
topics []Topic
}

type Topic string
type Message []byte

func NewTopicManager() *TopicManager {
return &TopicManager{
messageQueue: make(map[Topic]Queue[Message]),
connectionQueue: make(map[Topic]Queue[*websocket.Conn]),
connections: map[Topic]map[*websocket.Conn]bool{},
}
}

func (T *TopicManager) GetNextMessage(topic Topic) (Message, error) {
if _, ok := T.messageQueue[topic]; !ok {
return nil, errors.New("topic not found")
}

message, err := T.messageQueue[topic].Dequeue()
if err != nil {
return nil, err
}

return message, nil
}

func (T *TopicManager) AddMessage(topic Topic, message Message) {
if _, ok := T.messageQueue[topic]; !ok {
T.messageQueue[topic] = NewLinkedListQueue[Message]()
T.topics = append(T.topics, topic)
}
T.messageQueue[topic].Enqueue(message)
}

func (T *TopicManager) AddConnection(topic Topic, conn *websocket.Conn) {
if _, ok := T.connections[topic]; !ok {
T.connections[topic] = map[*websocket.Conn]bool{conn: true}
} else {
T.connections[topic][conn] = true
}
}

func (T *TopicManager) RemoveConnection(topic Topic, conn *websocket.Conn) {
if _, ok := T.connections[topic]; !ok {
return
}
delete(T.connections[topic], conn)
}

func (T *TopicManager) GetTopics() []Topic {
return T.topics
}

func (T *TopicManager) GetNextConnection(topic Topic) (*websocket.Conn, error) {
if T.topicNotReady(topic) {
return nil, errors.New("topic not found or no data")
}

connection, err := T.connectionQueue[topic].Dequeue()
if err != nil {
return nil, err
}
return connection, nil
}

func (T *TopicManager) ConnectionExists(topic Topic, connection *websocket.Conn) bool {
if _, ok := T.connections[topic]; !ok {
return false
}
if _, ok := T.connections[topic][connection]; !ok {
return false
}

return true
}

func (T *TopicManager) AddConnectionToQueue(topic Topic, connection *websocket.Conn) {
if _, ok := T.connectionQueue[topic]; !ok {
T.connectionQueue[topic] = NewLinkedListQueue[*websocket.Conn]()
}
T.connectionQueue[topic].Enqueue(connection)
}

func (T *TopicManager) topicNotReady(topic Topic) bool {
if _, ok := T.connections[topic]; !ok {
return true
}
if _, ok := T.connectionQueue[topic]; !ok {
return true
}
if _, ok := T.messageQueue[topic]; !ok {
return true
}
return T.messageQueue[topic].IsEmpty() || T.connectionQueue[topic].IsEmpty() || len(T.connections[topic]) == 0
}
Loading