Skip to content

Commit 8cb30f2

Browse files
committed
Feature: refactor storage to save/load messages by topic
1 parent b614978 commit 8cb30f2

File tree

4 files changed

+69
-23
lines changed

4 files changed

+69
-23
lines changed

internal/storage.go

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,54 +5,103 @@ import (
55
"os"
66
)
77

8+
const DataDir = "./data/"
9+
const FileExcluded = ".gitkeep"
10+
811
type Storage struct {
9-
queue Queue[string]
12+
topicManager *TopicManager
1013
}
1114

12-
func NewStorage(queue Queue[string]) *Storage {
15+
func NewStorage(topicManager *TopicManager) *Storage {
1316
return &Storage{
14-
queue: queue,
17+
topicManager: topicManager,
18+
}
19+
}
20+
21+
func (S *Storage) createStoreDirectory() {
22+
_, err := os.ReadDir(DataDir)
23+
if err != nil {
24+
if os.IsNotExist(err) {
25+
err := os.Mkdir(DataDir, 0755)
26+
if err != nil {
27+
panic(err)
28+
}
29+
} else if !os.IsExist(err) {
30+
panic(err)
31+
}
1532
}
1633
}
1734

1835
func (S *Storage) clear() {
19-
err := os.WriteFile("./data", []byte(""), 0644)
36+
dirEntries, err := os.ReadDir(DataDir)
2037
if err != nil {
2138
panic(err)
2239
}
40+
for _, entry := range dirEntries {
41+
if entry.Name() == FileExcluded {
42+
continue
43+
}
44+
os.Remove(DataDir + entry.Name())
45+
}
2346
}
2447

2548
func (S *Storage) Save() {
2649
S.clear()
50+
S.createStoreDirectory()
51+
52+
for _, topic := range S.topicManager.GetTopics() {
53+
S.saveTopic(topic)
54+
}
55+
}
2756

28-
file, err := os.OpenFile("./data", os.O_RDWR|os.O_CREATE, 0644)
57+
func (S *Storage) saveTopic(topic Topic) {
58+
file, err := os.OpenFile(string(DataDir+topic), os.O_RDWR|os.O_CREATE, 0644)
2959
if err != nil {
3060
panic(err)
3161
}
3262
defer file.Close()
33-
for !S.queue.IsEmpty() {
34-
value, _ := S.queue.Dequeue()
35-
//err := os.OpenFile("./data", []byte(value+"\n"), 0644)
36-
_, err = file.WriteString(value + "\n")
63+
for {
64+
message, err := S.topicManager.GetNextMessage(topic)
65+
if err != nil {
66+
return
67+
}
68+
_, err = file.Write(message)
3769
if err != nil {
3870
panic(err)
3971
}
72+
// Line feed
73+
_, _ = file.Write([]byte{10})
4074
}
4175
}
4276

4377
func (S *Storage) Load() {
44-
file, err := os.OpenFile("./data", os.O_RDONLY, 0644)
78+
dirEntries, err := os.ReadDir(DataDir)
79+
if err != nil {
80+
panic(err)
81+
}
82+
83+
for _, dirEntry := range dirEntries {
84+
if dirEntry.Name() == FileExcluded {
85+
continue
86+
}
87+
S.loadTopic(dirEntry)
88+
}
89+
}
90+
91+
func (S *Storage) loadTopic(dirEntry os.DirEntry) {
92+
topic := dirEntry.Name()
93+
file, err := os.OpenFile(DataDir+topic, os.O_RDONLY, 0644)
4594
if err != nil {
46-
return
95+
panic(err)
4796
}
4897
defer file.Close()
4998

5099
scanner := bufio.NewScanner(file)
51100
for scanner.Scan() {
52-
text := scanner.Text()
53-
if text == "" {
101+
text := scanner.Bytes()
102+
if len(text) <= 0 {
54103
continue
55104
}
56-
S.queue.Enqueue(text)
105+
S.topicManager.AddMessage(Topic(topic), text)
57106
}
58107
}

internal/system.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"github.com/gofiber/fiber/v2/log"
55
"os"
66
"os/signal"
7+
"syscall"
78
)
89

910
func HandleOsSignal(storage *Storage) {
1011
sigchan := make(chan os.Signal, 1)
11-
signals := []os.Signal{os.Kill, os.Interrupt}
12+
signals := []os.Signal{os.Kill, os.Interrupt, syscall.SIGTERM}
1213
signal.Notify(sigchan, signals...)
1314
<-sigchan
1415

internal/topic.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ type TopicManager struct {
99
messageQueue map[Topic]Queue[Message]
1010
connectionQueue map[Topic]Queue[*websocket.Conn]
1111
connections map[Topic]map[*websocket.Conn]bool
12+
topics []Topic
1213
}
1314

1415
type Topic string
@@ -38,6 +39,7 @@ func (T *TopicManager) GetNextMessage(topic Topic) (Message, error) {
3839
func (T *TopicManager) AddMessage(topic Topic, message Message) {
3940
if _, ok := T.messageQueue[topic]; !ok {
4041
T.messageQueue[topic] = NewLinkedListQueue[Message]()
42+
T.topics = append(T.topics, topic)
4143
}
4244
T.messageQueue[topic].Enqueue(message)
4345
}
@@ -58,12 +60,7 @@ func (T *TopicManager) RemoveConnection(topic Topic, conn *websocket.Conn) {
5860
}
5961

6062
func (T *TopicManager) GetTopics() []Topic {
61-
topics := make([]Topic, 0)
62-
for topic := range T.messageQueue {
63-
topics = append(topics, topic)
64-
}
65-
66-
return topics
63+
return T.topics
6764
}
6865

6966
func (T *TopicManager) GetNextConnection(topic Topic) (*websocket.Conn, error) {

src/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ import (
1010

1111
func main() {
1212
topicManager := internal.NewTopicManager()
13-
queue := internal.NewLinkedListQueue[string]()
14-
storage := internal.NewStorage(queue)
13+
storage := internal.NewStorage(topicManager)
1514
storage.Load()
1615
defer internal.SaveOnPanic(storage)
1716
go internal.HandleOsSignal(storage)

0 commit comments

Comments
 (0)