Skip to content

Commit aabe308

Browse files
authored
Merge pull request #1 from ThomasBoom89/topic-implementation
Topic implementation
2 parents 5786a05 + 8cb30f2 commit aabe308

File tree

12 files changed

+256
-84
lines changed

12 files changed

+256
-84
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
.idea
2-
data
2+
data/**
3+
!data/.gitkeep

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ Then start the docker container via
1818

1919
## Usage
2020

21-
You can publish messages with http POST on /publish message must be sent in plaintext
21+
You can publish messages with http POST on /{topic}/publish message must be sent in plaintext
2222

23-
To receive messages you can http GET on /subscribe or connect via websocket to /ws
23+
To receive messages you can http GET on /{topic}/subscribe or connect via websocket to /{topic}/ws
2424
On the websocket you need to send "next" in plaintext to get a new message
2525

2626
## License

compose.development.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ services:
99
- "3002:3000"
1010
volumes:
1111
- ./src:/app/src
12+
- ./data:/app/data
1213
- ./internal:/app/internal
1314
# - ./pkg:/app/pkg
1415
- ./.air.toml:/app/.air.toml
@@ -25,6 +26,7 @@ services:
2526
- "2345:2345"
2627
volumes:
2728
- ./src:/app/src
29+
- ./data:/app/data
2830
- ./internal:/app/internal
2931
# - ./pkg:/app/pkg
3032
- ./.air.debug.toml:/app/.air.toml

compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ services:
44
target: Production
55
container_name: application
66
restart: always
7+
volumes:
8+
- ./data:/app/data

data/.gitkeep

Whitespace-only changes.

internal/http.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,35 @@
11
package internal
22

3-
import "github.com/gofiber/fiber/v2"
3+
import (
4+
"github.com/gofiber/fiber/v2"
5+
"github.com/gofiber/fiber/v2/utils"
6+
)
47

58
type HTTP struct {
6-
app *fiber.App
7-
queue Queue[string]
9+
app *fiber.App
10+
topicManager *TopicManager
811
}
912

10-
func NewHTTP(app *fiber.App, queue Queue[string]) *HTTP {
11-
return &HTTP{app: app, queue: queue}
13+
func NewHTTP(app *fiber.App, topicManager *TopicManager) *HTTP {
14+
return &HTTP{app: app, topicManager: topicManager}
1215
}
1316

1417
func (H *HTTP) SetupRoutes() {
15-
H.app.Post("/publish", func(c *fiber.Ctx) error {
16-
H.queue.Enqueue(string(c.Body()))
18+
H.app.Post("/:topic/publish", func(c *fiber.Ctx) error {
19+
topic := Topic(c.Params("topic"))
20+
body := utils.CopyBytes(c.Body())
21+
H.topicManager.AddMessage(topic, body)
1722

1823
return c.SendStatus(fiber.StatusOK)
1924
})
2025

21-
H.app.Get("/subscribe", func(c *fiber.Ctx) error {
22-
element, err := H.queue.Dequeue()
26+
H.app.Get("/:topic/subscribe", func(c *fiber.Ctx) error {
27+
topic := Topic(c.Params("topic"))
28+
message, err := H.topicManager.GetNextMessage(topic)
2329
if err != nil {
24-
return c.SendString("")
30+
return c.Send([]byte{})
2531
}
2632

27-
return c.SendString(element)
33+
return c.Send(message)
2834
})
2935
}

internal/queue.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ import (
55
"github.com/gofiber/contrib/websocket"
66
)
77

8-
type Queue[T string | *websocket.Conn] interface {
8+
type Queue[T string | *websocket.Conn | Message | []byte] interface {
99
IsEmpty() bool
1010
Enqueue(T)
1111
Dequeue() (T, error)
1212
}
1313

14-
type SliceQueue[T string | *websocket.Conn] struct {
14+
type SliceQueue[T string | *websocket.Conn | Message | []byte] struct {
1515
data []T
1616
}
1717

18-
func NewSliceQueue[T string | *websocket.Conn]() *SliceQueue[T] {
18+
func NewSliceQueue[T string | *websocket.Conn | Message | []byte]() *SliceQueue[T] {
1919
return &SliceQueue[T]{data: nil}
2020
}
2121

@@ -37,21 +37,21 @@ func (S *SliceQueue[T]) Dequeue() (T, error) {
3737
return element, nil
3838
}
3939

40-
type LinkedList[T string | *websocket.Conn] struct {
40+
type LinkedList[T string | *websocket.Conn | Message | []byte] struct {
4141
head *Node[T]
4242
tail *Node[T]
4343
}
4444

45-
type Node[T string | *websocket.Conn] struct {
45+
type Node[T string | *websocket.Conn | Message | []byte] struct {
4646
data T
4747
next *Node[T]
4848
}
4949

50-
type LinkedListQueue[T string | *websocket.Conn] struct {
50+
type LinkedListQueue[T string | *websocket.Conn | Message | []byte] struct {
5151
data *LinkedList[T]
5252
}
5353

54-
func NewLinkedListQueue[T string | *websocket.Conn]() *LinkedListQueue[T] {
54+
func NewLinkedListQueue[T string | *websocket.Conn | Message | []byte]() *LinkedListQueue[T] {
5555
return &LinkedListQueue[T]{data: &LinkedList[T]{head: nil, tail: nil}}
5656
}
5757

internal/storage.go

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,54 +5,103 @@ import (
55
"os"
66
)
77

8+
const DataDir = "./data/"
9+
const FileExcluded = ".gitkeep"
10+
811
type Storage struct {
9-
queue Queue[string]
12+
topicManager *TopicManager
1013
}
1114

12-
func NewStorage(queue Queue[string]) *Storage {
15+
func NewStorage(topicManager *TopicManager) *Storage {
1316
return &Storage{
14-
queue: queue,
17+
topicManager: topicManager,
18+
}
19+
}
20+
21+
func (S *Storage) createStoreDirectory() {
22+
_, err := os.ReadDir(DataDir)
23+
if err != nil {
24+
if os.IsNotExist(err) {
25+
err := os.Mkdir(DataDir, 0755)
26+
if err != nil {
27+
panic(err)
28+
}
29+
} else if !os.IsExist(err) {
30+
panic(err)
31+
}
1532
}
1633
}
1734

1835
func (S *Storage) clear() {
19-
err := os.WriteFile("./data", []byte(""), 0644)
36+
dirEntries, err := os.ReadDir(DataDir)
2037
if err != nil {
2138
panic(err)
2239
}
40+
for _, entry := range dirEntries {
41+
if entry.Name() == FileExcluded {
42+
continue
43+
}
44+
os.Remove(DataDir + entry.Name())
45+
}
2346
}
2447

2548
func (S *Storage) Save() {
2649
S.clear()
50+
S.createStoreDirectory()
51+
52+
for _, topic := range S.topicManager.GetTopics() {
53+
S.saveTopic(topic)
54+
}
55+
}
2756

28-
file, err := os.OpenFile("./data", os.O_RDWR|os.O_CREATE, 0644)
57+
func (S *Storage) saveTopic(topic Topic) {
58+
file, err := os.OpenFile(string(DataDir+topic), os.O_RDWR|os.O_CREATE, 0644)
2959
if err != nil {
3060
panic(err)
3161
}
3262
defer file.Close()
33-
for !S.queue.IsEmpty() {
34-
value, _ := S.queue.Dequeue()
35-
//err := os.OpenFile("./data", []byte(value+"\n"), 0644)
36-
_, err = file.WriteString(value + "\n")
63+
for {
64+
message, err := S.topicManager.GetNextMessage(topic)
65+
if err != nil {
66+
return
67+
}
68+
_, err = file.Write(message)
3769
if err != nil {
3870
panic(err)
3971
}
72+
// Line feed
73+
_, _ = file.Write([]byte{10})
4074
}
4175
}
4276

4377
func (S *Storage) Load() {
44-
file, err := os.OpenFile("./data", os.O_RDONLY, 0644)
78+
dirEntries, err := os.ReadDir(DataDir)
79+
if err != nil {
80+
panic(err)
81+
}
82+
83+
for _, dirEntry := range dirEntries {
84+
if dirEntry.Name() == FileExcluded {
85+
continue
86+
}
87+
S.loadTopic(dirEntry)
88+
}
89+
}
90+
91+
func (S *Storage) loadTopic(dirEntry os.DirEntry) {
92+
topic := dirEntry.Name()
93+
file, err := os.OpenFile(DataDir+topic, os.O_RDONLY, 0644)
4594
if err != nil {
46-
return
95+
panic(err)
4796
}
4897
defer file.Close()
4998

5099
scanner := bufio.NewScanner(file)
51100
for scanner.Scan() {
52-
text := scanner.Text()
53-
if text == "" {
101+
text := scanner.Bytes()
102+
if len(text) <= 0 {
54103
continue
55104
}
56-
S.queue.Enqueue(text)
105+
S.topicManager.AddMessage(Topic(topic), text)
57106
}
58107
}

internal/system.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"github.com/gofiber/fiber/v2/log"
55
"os"
66
"os/signal"
7+
"syscall"
78
)
89

910
func HandleOsSignal(storage *Storage) {
1011
sigchan := make(chan os.Signal, 1)
11-
signals := []os.Signal{os.Kill, os.Interrupt}
12+
signals := []os.Signal{os.Kill, os.Interrupt, syscall.SIGTERM}
1213
signal.Notify(sigchan, signals...)
1314
<-sigchan
1415

internal/topic.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package internal
2+
3+
import (
4+
"errors"
5+
"github.com/gofiber/contrib/websocket"
6+
)
7+
8+
type TopicManager struct {
9+
messageQueue map[Topic]Queue[Message]
10+
connectionQueue map[Topic]Queue[*websocket.Conn]
11+
connections map[Topic]map[*websocket.Conn]bool
12+
topics []Topic
13+
}
14+
15+
type Topic string
16+
type Message []byte
17+
18+
func NewTopicManager() *TopicManager {
19+
return &TopicManager{
20+
messageQueue: make(map[Topic]Queue[Message]),
21+
connectionQueue: make(map[Topic]Queue[*websocket.Conn]),
22+
connections: map[Topic]map[*websocket.Conn]bool{},
23+
}
24+
}
25+
26+
func (T *TopicManager) GetNextMessage(topic Topic) (Message, error) {
27+
if _, ok := T.messageQueue[topic]; !ok {
28+
return nil, errors.New("topic not found")
29+
}
30+
31+
message, err := T.messageQueue[topic].Dequeue()
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
return message, nil
37+
}
38+
39+
func (T *TopicManager) AddMessage(topic Topic, message Message) {
40+
if _, ok := T.messageQueue[topic]; !ok {
41+
T.messageQueue[topic] = NewLinkedListQueue[Message]()
42+
T.topics = append(T.topics, topic)
43+
}
44+
T.messageQueue[topic].Enqueue(message)
45+
}
46+
47+
func (T *TopicManager) AddConnection(topic Topic, conn *websocket.Conn) {
48+
if _, ok := T.connections[topic]; !ok {
49+
T.connections[topic] = map[*websocket.Conn]bool{conn: true}
50+
} else {
51+
T.connections[topic][conn] = true
52+
}
53+
}
54+
55+
func (T *TopicManager) RemoveConnection(topic Topic, conn *websocket.Conn) {
56+
if _, ok := T.connections[topic]; !ok {
57+
return
58+
}
59+
delete(T.connections[topic], conn)
60+
}
61+
62+
func (T *TopicManager) GetTopics() []Topic {
63+
return T.topics
64+
}
65+
66+
func (T *TopicManager) GetNextConnection(topic Topic) (*websocket.Conn, error) {
67+
if T.topicNotReady(topic) {
68+
return nil, errors.New("topic not found or no data")
69+
}
70+
71+
connection, err := T.connectionQueue[topic].Dequeue()
72+
if err != nil {
73+
return nil, err
74+
}
75+
return connection, nil
76+
}
77+
78+
func (T *TopicManager) ConnectionExists(topic Topic, connection *websocket.Conn) bool {
79+
if _, ok := T.connections[topic]; !ok {
80+
return false
81+
}
82+
if _, ok := T.connections[topic][connection]; !ok {
83+
return false
84+
}
85+
86+
return true
87+
}
88+
89+
func (T *TopicManager) AddConnectionToQueue(topic Topic, connection *websocket.Conn) {
90+
if _, ok := T.connectionQueue[topic]; !ok {
91+
T.connectionQueue[topic] = NewLinkedListQueue[*websocket.Conn]()
92+
}
93+
T.connectionQueue[topic].Enqueue(connection)
94+
}
95+
96+
func (T *TopicManager) topicNotReady(topic Topic) bool {
97+
if _, ok := T.connections[topic]; !ok {
98+
return true
99+
}
100+
if _, ok := T.connectionQueue[topic]; !ok {
101+
return true
102+
}
103+
if _, ok := T.messageQueue[topic]; !ok {
104+
return true
105+
}
106+
return T.messageQueue[topic].IsEmpty() || T.connectionQueue[topic].IsEmpty() || len(T.connections[topic]) == 0
107+
}

0 commit comments

Comments
 (0)