From 89becc067f672c9f8112cfbc84a05822829b2222 Mon Sep 17 00:00:00 2001 From: RahulRoyMattam Date: Tue, 16 Jan 2018 20:36:50 +0530 Subject: [PATCH 1/2] Multi Channel Chat --- chat.go | 85 ---------------- chat/client.go | 120 ++++++++++++++++++++++ main.go | 47 +-------- public/css/application.css | 5 + public/index.html | 17 +++- public/js/application.js | 26 ++++- redis.go | 198 ------------------------------------- redis/helper.go | 102 +++++++++++++++++++ redis/reader.go | 192 +++++++++++++++++++++++++++++++++++ redis/writer.go | 68 +++++++++++++ routes.go | 56 +++++++++++ 11 files changed, 587 insertions(+), 329 deletions(-) delete mode 100644 chat.go create mode 100644 chat/client.go delete mode 100644 redis.go create mode 100644 redis/helper.go create mode 100644 redis/reader.go create mode 100644 redis/writer.go create mode 100644 routes.go diff --git a/chat.go b/chat.go deleted file mode 100644 index ee494e5..0000000 --- a/chat.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "encoding/json" - "io" - "net/http" - - "github.com/Sirupsen/logrus" - "github.com/gorilla/websocket" - "github.com/pkg/errors" -) - -var ( - upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - } -) - -// message sent to us by the javascript client -type message struct { - Handle string `json:"handle"` - Text string `json:"text"` -} - -// validateMessage so that we know it's valid JSON and contains a Handle and -// Text -func validateMessage(data []byte) (message, error) { - var msg message - - if err := json.Unmarshal(data, &msg); err != nil { - return msg, errors.Wrap(err, "Unmarshaling message") - } - - if msg.Handle == "" && msg.Text == "" { - return msg, errors.New("Message has no Handle or Text") - } - - return msg, nil -} - -// handleWebsocket connection. -func handleWebsocket(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - m := "Unable to upgrade to websockets" - log.WithField("err", err).Println(m) - http.Error(w, m, http.StatusBadRequest) - return - } - - rr.register(ws) - - for { - mt, data, err := ws.ReadMessage() - l := log.WithFields(logrus.Fields{"mt": mt, "data": data, "err": err}) - if err != nil { - if websocket.IsCloseError(err, websocket.CloseGoingAway) || err == io.EOF { - l.Info("Websocket closed!") - break - } - l.Error("Error reading websocket message") - } - switch mt { - case websocket.TextMessage: - msg, err := validateMessage(data) - if err != nil { - l.WithFields(logrus.Fields{"msg": msg, "err": err}).Error("Invalid Message") - break - } - rw.publish(data) - default: - l.Warning("Unknown Message!") - } - } - - rr.deRegister(ws) - - ws.WriteMessage(websocket.CloseMessage, []byte{}) -} diff --git a/chat/client.go b/chat/client.go new file mode 100644 index 0000000..4c15ea0 --- /dev/null +++ b/chat/client.go @@ -0,0 +1,120 @@ +package chat + +import ( + "encoding/json" + "io" + "net/http" + "time" + + "github.com/Sirupsen/logrus" + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/rmattam/go-websocket-chat-demo/redis" +) + +var ( + upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + waitTimeout = time.Minute * 10 + log = logrus.WithField("cmd", "go-websocket-chat-demo-chat") +) + +//Hub struct +type Hub struct { + channel string + + receive redis.Receiver + write redis.Writer +} + +// message sent to us by the javascript client +type message struct { + Handle string `json:"handle"` + Text string `json:"text"` +} + +// SubsribeRedis : Initialize the redis routines required for pub sub. +func (c *Hub) SubsribeRedis(redisURL string, channel string) { + redisPool, err := redis.NewRedisPoolFromURL(redisURL) + if err != nil { + log.WithField("url", redisURL).Fatal("Unable to create Redis pool") + } + c.channel = channel + c.receive = redis.NewReceiver(redisPool, c.channel, ValidateRedisMessage) + c.write = redis.NewWriter(redisPool, c.channel) + go c.receive.Setup(redisURL) + go c.write.Setup(redisURL) +} + +//ValidateRedisMessage validates incoming redis messages on the chat channel. +func ValidateRedisMessage(data []byte) error { + _, e := validateMessage(data) + return e +} + +// validateMessage so that we know it's valid JSON and contains a Handle and +// Text +func validateMessage(data []byte) (message, error) { + var msg message + + if err := json.Unmarshal(data, &msg); err != nil { + return msg, errors.Wrap(err, "Unmarshaling message") + } + + if msg.Handle == "" && msg.Text == "" { + return msg, errors.New("Message has no Handle or Text") + } + + return msg, nil +} + +// HandleWebsocket connection. +func HandleWebsocket(c *Hub, w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ws, err := upgrader.Upgrade(w, r, nil) + defer func() { + ws.Close() + }() + + if err != nil { + m := "Unable to upgrade to websockets" + log.WithField("err", err).Println(m) + http.Error(w, m, http.StatusBadRequest) + return + } + + c.receive.Register(ws) + + for { + mt, data, err := ws.ReadMessage() + l := log.WithFields(logrus.Fields{"mt": mt, "err": err}) + if err != nil { + if websocket.IsCloseError(err, websocket.CloseGoingAway) || err == io.EOF { + l.Info("Websocket closed!") + break + } + l.WithField("data", data).Error("Error reading websocket message") + } + switch mt { + case websocket.TextMessage: + msg, err := validateMessage(data) + if err != nil { + l.WithFields(logrus.Fields{"data": data, "msg": msg, "err": err}).Error("Invalid Message") + break + } + l.WithField("msg", msg).Info("message from client") + c.write.Publish(data) + default: + l.WithField("data", data).Warning("Unknown Message!") + } + } + + c.receive.DeRegister(ws) + ws.WriteMessage(websocket.CloseMessage, []byte{}) +} diff --git a/main.go b/main.go index b9395dd..da47a4e 100644 --- a/main.go +++ b/main.go @@ -3,17 +3,12 @@ package main import ( "net/http" "os" - "time" "github.com/Sirupsen/logrus" - "github.com/heroku/x/redis" ) var ( - waitTimeout = time.Minute * 10 - log = logrus.WithField("cmd", "go-websocket-chat-demo") - rr redisReceiver - rw redisWriter + log = logrus.WithField("cmd", "go-websocket-chat-demo") ) func main() { @@ -26,44 +21,8 @@ func main() { if redisURL == "" { log.WithField("REDIS_URL", redisURL).Fatal("$REDIS_URL must be set") } - redisPool, err := redis.NewRedisPoolFromURL(redisURL) - if err != nil { - log.WithField("url", redisURL).Fatal("Unable to create Redis pool") - } - - rr = newRedisReceiver(redisPool) - rw = newRedisWriter(redisPool) - - go func() { - for { - waited, err := redis.WaitForAvailability(redisURL, waitTimeout, rr.wait) - if !waited || err != nil { - log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!") - } - rr.broadcast(availableMessage) - err = rr.run() - if err == nil { - break - } - log.Error(err) - } - }() - - go func() { - for { - waited, err := redis.WaitForAvailability(redisURL, waitTimeout, nil) - if !waited || err != nil { - log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!") - } - err = rw.run() - if err == nil { - break - } - log.Error(err) - } - }() - http.Handle("/", http.FileServer(http.Dir("./public"))) - http.HandleFunc("/ws", handleWebsocket) + router := NewRouter(redisURL) + http.Handle("/", router) log.Println(http.ListenAndServe(":"+port, nil)) } diff --git a/public/css/application.css b/public/css/application.css index bc3fc2a..a9a8f31 100755 --- a/public/css/application.css +++ b/public/css/application.css @@ -2,3 +2,8 @@ overflow: auto; height: 500px; } + +#chat-text2 { + overflow: auto; + height: 500px; +} diff --git a/public/index.html b/public/index.html index 3eccbda..9822461 100644 --- a/public/index.html +++ b/public/index.html @@ -24,10 +24,25 @@

Go Websocket Chat Demo

+ +
+
+ +
+
+ +
+ +
+ +
+
Fork me on GitHub diff --git a/public/js/application.js b/public/js/application.js index d4da5fc..ead3dd1 100755 --- a/public/js/application.js +++ b/public/js/application.js @@ -1,4 +1,4 @@ -var box = new ReconnectingWebSocket(location.protocol.replace("http","ws") + "//" + location.host + "/ws"); +var box = new ReconnectingWebSocket(location.protocol.replace("http","ws") + "//" + location.host + "/ws/chat1"); box.onmessage = function(message) { var data = JSON.parse(message.data); @@ -21,3 +21,27 @@ $("#input-form").on("submit", function(event) { box.send(JSON.stringify({ handle: handle, text: text })); $("#input-text")[0].value = ""; }); + + +var box2 = new ReconnectingWebSocket(location.protocol.replace("http","ws") + "//" + location.host + "/ws/chat2"); +box2.onmessage = function(message) { + var data = JSON.parse(message.data); + $("#chat-text2").append("
" + $('').text(data.handle).html() + "
" + $('').text(data.text).html() + "
"); + $("#chat-text2").stop().animate({ + scrollTop: $('#chat-text2')[0].scrollHeight + }, 800); +}; + +box2.onclose = function(){ + console.log('box2 closed'); + this.box2 = new WebSocket(box2.url); + +}; + +$("#input-form2").on("submit", function(event) { + event.preventDefault(); + var handle = $("#input-handle2")[0].value; + var text = $("#input-text2")[0].value; + box2.send(JSON.stringify({ handle: handle, text: text })); + $("#input-text2")[0].value = ""; +}); diff --git a/redis.go b/redis.go deleted file mode 100644 index 33f82c9..0000000 --- a/redis.go +++ /dev/null @@ -1,198 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/Sirupsen/logrus" - "github.com/garyburd/redigo/redis" - "github.com/gorilla/websocket" - "github.com/pkg/errors" -) - -const ( - // Channel name to use with redis - Channel = "chat" -) - -var ( - waitingMessage, availableMessage []byte - waitSleep = time.Second * 10 -) - -func init() { - var err error - waitingMessage, err = json.Marshal(message{ - Handle: "system", - Text: "Waiting for redis to be available. Messaging won't work until redis is available", - }) - if err != nil { - panic(err) - } - availableMessage, err = json.Marshal(message{ - Handle: "system", - Text: "Redis is now available & messaging is now possible", - }) - if err != nil { - panic(err) - } -} - -// redisReceiver receives messages from Redis and broadcasts them to all -// registered websocket connections that are Registered. -type redisReceiver struct { - pool *redis.Pool - - messages chan []byte - newConnections chan *websocket.Conn - rmConnections chan *websocket.Conn -} - -// newRedisReceiver creates a redisReceiver that will use the provided -// rredis.Pool. -func newRedisReceiver(pool *redis.Pool) redisReceiver { - return redisReceiver{ - pool: pool, - messages: make(chan []byte, 1000), // 1000 is arbitrary - newConnections: make(chan *websocket.Conn), - rmConnections: make(chan *websocket.Conn), - } -} - -func (rr *redisReceiver) wait(_ time.Time) error { - rr.broadcast(waitingMessage) - time.Sleep(waitSleep) - return nil -} - -// run receives pubsub messages from Redis after establishing a connection. -// When a valid message is received it is broadcast to all connected websockets -func (rr *redisReceiver) run() error { - l := log.WithField("channel", Channel) - conn := rr.pool.Get() - defer conn.Close() - psc := redis.PubSubConn{Conn: conn} - psc.Subscribe(Channel) - go rr.connHandler() - for { - switch v := psc.Receive().(type) { - case redis.Message: - l.WithField("message", string(v.Data)).Info("Redis Message Received") - if _, err := validateMessage(v.Data); err != nil { - l.WithField("err", err).Error("Error unmarshalling message from Redis") - continue - } - rr.broadcast(v.Data) - case redis.Subscription: - l.WithFields(logrus.Fields{ - "kind": v.Kind, - "count": v.Count, - }).Println("Redis Subscription Received") - case error: - return errors.Wrap(v, "Error while subscribed to Redis channel") - default: - l.WithField("v", v).Info("Unknown Redis receive during subscription") - } - } -} - -// broadcast the provided message to all connected websocket connections. -// If an error occurs while writting a message to a websocket connection it is -// closed and deregistered. -func (rr *redisReceiver) broadcast(msg []byte) { - rr.messages <- msg -} - -// register the websocket connection with the receiver. -func (rr *redisReceiver) register(conn *websocket.Conn) { - rr.newConnections <- conn -} - -// deRegister the connection by closing it and removing it from our list. -func (rr *redisReceiver) deRegister(conn *websocket.Conn) { - rr.rmConnections <- conn -} - -func (rr *redisReceiver) connHandler() { - conns := make([]*websocket.Conn, 0) - for { - select { - case msg := <-rr.messages: - for _, conn := range conns { - if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { - log.WithFields(logrus.Fields{ - "data": msg, - "err": err, - "conn": conn, - }).Error("Error writting data to connection! Closing and removing Connection") - conns = removeConn(conns, conn) - } - } - case conn := <-rr.newConnections: - conns = append(conns, conn) - case conn := <-rr.rmConnections: - conns = removeConn(conns, conn) - } - } -} - -func removeConn(conns []*websocket.Conn, remove *websocket.Conn) []*websocket.Conn { - var i int - var found bool - for i = 0; i < len(conns); i++ { - if conns[i] == remove { - found = true - break - } - } - if !found { - fmt.Printf("conns: %#v\nconn: %#v\n", conns, remove) - panic("Conn not found") - } - copy(conns[i:], conns[i+1:]) // shift down - conns[len(conns)-1] = nil // nil last element - return conns[:len(conns)-1] // truncate slice -} - -// redisWriter publishes messages to the Redis CHANNEL -type redisWriter struct { - pool *redis.Pool - messages chan []byte -} - -func newRedisWriter(pool *redis.Pool) redisWriter { - return redisWriter{ - pool: pool, - messages: make(chan []byte, 10000), - } -} - -// run the main redisWriter loop that publishes incoming messages to Redis. -func (rw *redisWriter) run() error { - conn := rw.pool.Get() - defer conn.Close() - - for data := range rw.messages { - if err := writeToRedis(conn, data); err != nil { - rw.publish(data) // attempt to redeliver later - return err - } - } - return nil -} - -func writeToRedis(conn redis.Conn, data []byte) error { - if err := conn.Send("PUBLISH", Channel, data); err != nil { - return errors.Wrap(err, "Unable to publish message to Redis") - } - if err := conn.Flush(); err != nil { - return errors.Wrap(err, "Unable to flush published message to Redis") - } - return nil -} - -// publish to Redis via channel. -func (rw *redisWriter) publish(data []byte) { - rw.messages <- data -} diff --git a/redis/helper.go b/redis/helper.go new file mode 100644 index 0000000..69c1e3a --- /dev/null +++ b/redis/helper.go @@ -0,0 +1,102 @@ +package redis + +import ( + "net/url" + "time" + + "github.com/garyburd/redigo/redis" +) + +// WaitFunc to be executed occasionally by something that is waiting. +// Should return an error to cancel the waiting +// Should also sleep some amount of time to throttle connection attempts +type WaitFunc func(time.Time, []byte) error + +// waitForAvailability of the redis server located at the provided url, timeout if the Duration passes before being able to connect +func waitForAvailability(url string, d time.Duration, waitingMessage []byte, f WaitFunc) (bool, error) { + h, _, err := parseURL(url) + if err != nil { + return false, err + } + conn := make(chan struct{}) + errs := make(chan error) + go func() { + for { + c, err := redis.Dial("tcp", h) + if err == nil { + c.Close() + conn <- struct{}{} + return + } + if f != nil { + err := f(time.Now(), waitingMessage) + if err != nil { + errs <- err + return + } + } + } + }() + select { + case err := <-errs: + return false, err + case <-conn: + return true, nil + case <-time.After(d): + return false, nil + } +} + +// ParseURL in the form of redis://h:@ec2-23-23-129-214.compute-1.amazonaws.com:25219 +// and return the host and password +func parseURL(us string) (string, string, error) { + u, err := url.Parse(us) + if err != nil { + return "", "", err + } + var password string + if u.User != nil { + password, _ = u.User.Password() + } + var host string + if u.Host == "" { + host = "localhost" + } else { + host = u.Host + } + return host, password, nil +} + +// NewRedisPoolFromURL returns a new *redigo/redis.Pool configured for the supplied url +// The url can include a password in the standard form and if so is used to AUTH against +// the redis server +func NewRedisPoolFromURL(url string) (*redis.Pool, error) { + h, p, err := parseURL(url) + if err != nil { + return nil, err + } + return &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", h) + if err != nil { + return nil, err + } + if p != "" { + if _, err := c.Do("AUTH", p); err != nil { + c.Close() + return nil, err + } + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + if time.Since(t) < time.Minute { + return nil + } + _, err := c.Do("PING") + return err + }, + }, nil +} diff --git a/redis/reader.go b/redis/reader.go new file mode 100644 index 0000000..e8fe994 --- /dev/null +++ b/redis/reader.go @@ -0,0 +1,192 @@ +package redis + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/Sirupsen/logrus" + "github.com/garyburd/redigo/redis" + "github.com/gorilla/websocket" + "github.com/pkg/errors" +) + +var ( + log = logrus.WithField("cmd", "go-websocket-chat-demo-redis") + waitSleep = time.Second * 10 + waitTimeout = time.Minute * 10 +) + +// message sent to us by the javascript client +type message struct { + Handle string `json:"handle"` + Text string `json:"text"` +} + +// Receiver receives messages from Redis and broadcasts them to all +// registered websocket connections that are Registered. +type Receiver struct { + pool *redis.Pool + + validateMessage func([]byte) error + channel string + messages chan []byte + newConnections chan *websocket.Conn + rmConnections chan *websocket.Conn +} + +// NewReceiver creates a redisReceiver that will use the provided +// rredis.Pool. +func NewReceiver(pool *redis.Pool, channel string, validateMessage func([]byte) error) Receiver { + return Receiver{ + pool: pool, + channel: channel, + validateMessage: validateMessage, + messages: make(chan []byte, 1000), // 1000 is arbitrary + newConnections: make(chan *websocket.Conn), + rmConnections: make(chan *websocket.Conn), + } +} + +//Wait for redis availability +func (rr *Receiver) Wait(_ time.Time, waitingMessage []byte) error { + rr.Broadcast(waitingMessage) + time.Sleep(waitSleep) + return nil +} + +//Setup spawns redis receiver service +func (rr *Receiver) Setup(redisURL string) { + for { + waitingMessage, _ := json.Marshal(message{ + Handle: "redis", + Text: "Waiting for redis to be available. Messaging won't work until redis is available", + }) + waited, err := waitForAvailability(redisURL, waitTimeout, waitingMessage, rr.Wait) + if !waited || err != nil { + log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!") + } + availableMessage, _ := json.Marshal(message{ + Handle: "redis", + Text: "Redis is now available & messaging is now possible", + }) + rr.Broadcast(availableMessage) + err = rr.Run() + if err == nil { + break + } + log.Error(err) + } +} + +// Run receives pubsub messages from Redis after establishing a connection. +// When a valid message is received it is broadcast to all connected websockets +func (rr *Receiver) Run() error { + l := log.WithField("channel", rr.channel) + if rr.channel == "" { + return errors.New("Redis channel is not set") + } + + conn := rr.pool.Get() + defer conn.Close() + psc := redis.PubSubConn{Conn: conn} + psc.Subscribe(rr.channel) + go rr.connHandler() + for { + switch v := psc.Receive().(type) { + case redis.Message: + l.WithField("message", string(v.Data)).Info("Redis Message Received") + if err := rr.validateMessage(v.Data); err != nil { + if status := validateStatusMessage(v.Data); status != nil { + l.WithField("err", err).Error("Error unmarshalling message from Redis") + continue + } + } + rr.Broadcast(v.Data) + case redis.Subscription: + l.WithFields(logrus.Fields{ + "kind": v.Kind, + "count": v.Count, + }).Println("Redis Subscription Received") + case error: + return errors.Wrap(v, "Error while subscribed to Redis channel") + default: + l.WithField("v", v).Info("Unknown Redis receive during subscription") + } + } +} + +// Broadcast the provided message to all connected websocket connections. +// If an error occurs while writting a message to a websocket connection it is +// closed and deregistered. +func (rr *Receiver) Broadcast(msg []byte) { + rr.messages <- msg +} + +// Register the websocket connection with the receiver. +func (rr *Receiver) Register(conn *websocket.Conn) { + rr.newConnections <- conn +} + +// DeRegister the connection by closing it and removing it from our list. +func (rr *Receiver) DeRegister(conn *websocket.Conn) { + rr.rmConnections <- conn +} + +func (rr *Receiver) connHandler() { + conns := make([]*websocket.Conn, 0) + defer func() { + for _, conn := range conns { + conn.Close() + } + }() + + for { + select { + case msg := <-rr.messages: + for _, conn := range conns { + if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { + log.WithFields(logrus.Fields{ + "data": msg, + "err": err, + "conn": conn, + }).Error("Error writting data to connection! Closing and removing Connection") + conns = removeConn(conns, conn) + } + } + case conn := <-rr.newConnections: + conns = append(conns, conn) + case conn := <-rr.rmConnections: + conns = removeConn(conns, conn) + } + } +} + +func removeConn(conns []*websocket.Conn, remove *websocket.Conn) []*websocket.Conn { + var i int + var found bool + for i = 0; i < len(conns); i++ { + if conns[i] == remove { + found = true + break + } + } + if !found { + fmt.Printf("conns: %#v\nconn: %#v\n", conns, remove) + panic("Conn not found") + } + copy(conns[i:], conns[i+1:]) // shift down + conns[len(conns)-1] = nil // nil last element + return conns[:len(conns)-1] // truncate slice +} + +func validateStatusMessage(data []byte) error { + var msg message + if err := json.Unmarshal(data, &msg); err != nil { + return errors.Wrap(err, "Unmarshaling message") + } + if msg.Handle == "" && msg.Text == "" { + return errors.New("Message has no Handle or Text") + } + return nil +} diff --git a/redis/writer.go b/redis/writer.go new file mode 100644 index 0000000..c438807 --- /dev/null +++ b/redis/writer.go @@ -0,0 +1,68 @@ +package redis + +import ( + "github.com/Sirupsen/logrus" + "github.com/garyburd/redigo/redis" + "github.com/pkg/errors" +) + +// Writer publishes messages to the Redis CHANNEL +type Writer struct { + pool *redis.Pool + + channel string + messages chan []byte +} + +//NewWriter creates a new redis writer and returns it +func NewWriter(pool *redis.Pool, channel string) Writer { + return Writer{ + pool: pool, + channel: channel, + messages: make(chan []byte, 10000), + } +} + +//Setup spawns the redis writer service +func (rw *Writer) Setup(redisURL string) { + for { + waited, err := waitForAvailability(redisURL, waitTimeout, nil, nil) + if !waited || err != nil { + log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!") + } + err = rw.Run() + if err == nil { + break + } + log.Error(err) + } +} + +// Run the main redisWriter loop that publishes incoming messages to Redis. +func (rw *Writer) Run() error { + conn := rw.pool.Get() + defer conn.Close() + + for data := range rw.messages { + if err := rw.writeToRedis(conn, data); err != nil { + rw.Publish(data) // attempt to redeliver later + return err + } + } + return nil +} + +func (rw *Writer) writeToRedis(conn redis.Conn, data []byte) error { + if err := conn.Send("PUBLISH", rw.channel, data); err != nil { + return errors.Wrap(err, "Unable to publish message to Redis") + } + if err := conn.Flush(); err != nil { + return errors.Wrap(err, "Unable to flush published message to Redis") + } + return nil +} + +// Publish to Redis via channel. +func (rw *Writer) Publish(data []byte) { + rw.messages <- data +} diff --git a/routes.go b/routes.go new file mode 100644 index 0000000..63f2dab --- /dev/null +++ b/routes.go @@ -0,0 +1,56 @@ +package main + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/rmattam/go-websocket-chat-demo/chat" +) + +// ChatSocket : struct to define the route parameters for the chat websocket. +type ChatSocket struct { + Name string + Pattern string + Channel string + HandlerFunc func(*chat.Hub, http.ResponseWriter, *http.Request) +} + +// ChatSockets : list of Route parameters for chat websocket configuration. +type ChatSockets []ChatSocket + +// NewRouter : Initialize the router with the parameters provided. +func NewRouter(redisURL string) *mux.Router { + r := mux.NewRouter().StrictSlash(true) + + for _, route := range chatsockets { + route := route + Hub := &chat.Hub{} + Hub.SubsribeRedis(redisURL, route.Channel) + r. + Methods("GET"). + Path(route.Pattern). + Name(route.Name). + HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + route.HandlerFunc(Hub, w, r) + }) + } + + //Setting up file servers + r.PathPrefix("/").Handler(http.StripPrefix("/", http.FileServer(http.Dir("./public")))) + return r +} + +var chatsockets = ChatSockets{ + ChatSocket{ + Name: "Chat1 Websocket Endpoint", + Pattern: "/ws/chat1", + Channel: "chat1", + HandlerFunc: chat.HandleWebsocket, + }, + ChatSocket{ + Name: "Chat1 Websocket Endpoint", + Pattern: "/ws/chat2", + Channel: "chat2", + HandlerFunc: chat.HandleWebsocket, + }, +} From fe518e1f7eca2a96a0a428d50ce38cbae2fb6399 Mon Sep 17 00:00:00 2001 From: RahulRoyMattam Date: Wed, 17 Jan 2018 19:28:05 +0530 Subject: [PATCH 2/2] simplify routes.go --- routes.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/routes.go b/routes.go index 63f2dab..eeecf0a 100644 --- a/routes.go +++ b/routes.go @@ -9,10 +9,9 @@ import ( // ChatSocket : struct to define the route parameters for the chat websocket. type ChatSocket struct { - Name string - Pattern string - Channel string - HandlerFunc func(*chat.Hub, http.ResponseWriter, *http.Request) + Name string + Pattern string + Channel string } // ChatSockets : list of Route parameters for chat websocket configuration. @@ -31,7 +30,7 @@ func NewRouter(redisURL string) *mux.Router { Path(route.Pattern). Name(route.Name). HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - route.HandlerFunc(Hub, w, r) + chat.HandleWebsocket(Hub, w, r) }) } @@ -42,15 +41,13 @@ func NewRouter(redisURL string) *mux.Router { var chatsockets = ChatSockets{ ChatSocket{ - Name: "Chat1 Websocket Endpoint", - Pattern: "/ws/chat1", - Channel: "chat1", - HandlerFunc: chat.HandleWebsocket, + Name: "Chat1 Websocket Endpoint", + Pattern: "/ws/chat1", + Channel: "chat1", }, ChatSocket{ - Name: "Chat1 Websocket Endpoint", - Pattern: "/ws/chat2", - Channel: "chat2", - HandlerFunc: chat.HandleWebsocket, + Name: "Chat2 Websocket Endpoint", + Pattern: "/ws/chat2", + Channel: "chat2", }, }