From ea3c096ea9d37f667b7531f5199d7f40c4820584 Mon Sep 17 00:00:00 2001 From: William Clark Date: Wed, 13 Dec 2023 08:15:23 +0000 Subject: [PATCH] simplify websockets --- api/api.go | 18 +++---- api/service.go | 26 ++++++---- api/websocket.go | 115 +++++------------------------------------ static/data-updater.js | 20 +++---- 4 files changed, 46 insertions(+), 133 deletions(-) diff --git a/api/api.go b/api/api.go index 70d3734..d205d47 100644 --- a/api/api.go +++ b/api/api.go @@ -54,27 +54,23 @@ func NewGinAdapter(corePort ports.CorePort, cfg config.Config) *GinAdapter { } client := &Client{ - socket: conn, - send: make(chan []byte), - manager: manager, - all: false, - channels: false, - config: false, + socket: conn, + send: make(chan []byte), + manager: manager, } manager.register <- client - go client.read() - go client.write() + go client.writer() }) + // start web socket service that dumps new data to websocket chan + go startService(manager, corePort) + adapter.router.POST("/v1/config/device", adapter.SetConfigDeviceHandler) adapter.router.POST("/v1/config/channel/:id", adapter.SetConfigChannelByIdHandler) adapter.router.POST("/v1/config/db", adapter.SetConfigDBHandler) - // start service that dumps new data to websocket chan - go startService(manager, corePort) - // TH7 demo code. html file and javascript adapter.router.GET("/", adapter.IndexHandler) adapter.router.Static("/assets", "./static") diff --git a/api/service.go b/api/service.go index 93c43cf..a1b90a9 100644 --- a/api/service.go +++ b/api/service.go @@ -10,31 +10,35 @@ import ( ) const ( - CheckChannelDur = time.Millisecond * 700 + CheckChannelDur = time.Millisecond * 1200 StartupSleepDur = time.Second * 5 ) -type ChannelsWrapper struct { - Channels []core.Channel `json:"channels"` +type Wrapper struct { + Channels []core.Channel `json:"channels"` + Ratio core.Ratio `json:"ratio"` + Timestamp string `json:"time"` } +// Regularly get the newest data and bung it out on ws func startService(man *Manager, core ports.CorePort) { - var chwrap ChannelsWrapper + var wrap Wrapper time.Sleep(StartupSleepDur) for { - chwrap.Channels = core.GetChannels() - marshaled, err := json.Marshal(chwrap) + wrap.Channels = core.GetChannels() + wrap.Ratio = core.GetRatio() + wrap.Timestamp = time.Now().Format(time.RFC1123) + + marshaled, err := json.Marshal(wrap) if err != nil { - log.Println("Error marshaling struct:", chwrap.Channels) + log.Println("Error marshaling struct:", wrap) } - man.broadcast <- Message{ - messageType: CHANNELS, - data: []byte(marshaled), - } + man.broadcast <- []byte(marshaled) + time.Sleep(CheckChannelDur) } diff --git a/api/websocket.go b/api/websocket.go index 323fcbf..eefecf5 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -1,10 +1,7 @@ package api import ( - "fmt" - "log" "net/http" - "strings" "github.com/gorilla/websocket" ) @@ -17,31 +14,15 @@ var upgrader = websocket.Upgrader{ }, } -type MessageType = int - -const ( - ALL MessageType = iota - CHANNELS - CONFIG -) - -type Message struct { - messageType MessageType - data []byte -} - type Client struct { - socket *websocket.Conn - send chan []byte - manager *Manager - channels bool - config bool - all bool + socket *websocket.Conn + send chan []byte + manager *Manager } type Manager struct { clients map[*Client]bool - broadcast chan Message + broadcast chan []byte register chan *Client unregister chan *Client } @@ -49,50 +30,12 @@ type Manager struct { func NewManager() *Manager { return &Manager{ clients: make(map[*Client]bool), - broadcast: make(chan Message), + broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), } } -func (man *Manager) BroadcastAll(msg []byte) { - for client := range man.clients { - select { - case client.send <- msg: - default: - man.unregister <- client - } - } -} - -func (man *Manager) BroadcastChannels(msg []byte) { - for client := range man.clients { - if !client.all && !client.channels { - continue - } - - select { - case client.send <- msg: - default: - man.unregister <- client - } - } -} - -func (man *Manager) BroadcastConfig(msg []byte) { - for client := range man.clients { - if !client.all && !client.config { - continue - } - - select { - case client.send <- msg: - default: - man.unregister <- client - } - } -} - func (man *Manager) start() { for { select { @@ -103,56 +46,25 @@ func (man *Manager) start() { // connection closed case conn := <-man.unregister: if _, ok := man.clients[conn]; ok { - fmt.Println("Unregistering:", conn) close(conn.send) delete(man.clients, conn) } // broadcast a message to all connected clients case message := <-man.broadcast: - switch message.messageType { - case ALL: - man.BroadcastAll(message.data) - case CHANNELS: - man.BroadcastChannels(message.data) - case CONFIG: - man.BroadcastConfig(message.data) + for client := range man.clients { + select { + case client.send <- message: + default: + man.unregister <- client + } } - } } } -func (c *Client) read() { - - defer func() { - c.manager.unregister <- c - _ = c.socket.Close() - }() - - for { - _, message, err := c.socket.ReadMessage() - if err != nil { - c.manager.unregister <- c - _ = c.socket.Close() - break - } - - switch strings.ToLower(string(message)) { - case "all": - c.all = true - case "channels": - c.channels = true - case "config": - c.config = true - default: - log.Println("received: ", message) - } - } -} - -func (c *Client) write() { +func (c *Client) writer() { defer func() { _ = c.socket.Close() @@ -163,12 +75,13 @@ func (c *Client) write() { case message, ok := <-c.send: if !ok { _ = c.socket.WriteMessage(websocket.CloseMessage, []byte{}) + c.manager.unregister <- c return } err := c.socket.WriteMessage(websocket.TextMessage, message) if err != nil { - fmt.Println(err) + c.manager.unregister <- c return } } diff --git a/static/data-updater.js b/static/data-updater.js index 712fad1..8648a09 100644 --- a/static/data-updater.js +++ b/static/data-updater.js @@ -91,16 +91,16 @@ setInterval(updateRead, 1000); setInterval(updateConfig, 10000); // websocket demo -const ws = new WebSocket("ws://" + window.location.host + "/ws"); -ws.onopen = (e) => { +const ws = new WebSocket(`ws://${window.location.host}/ws`); + +ws.addEventListener("open", (event) => { console.log("Connection open"); - ws.send("ALL"); -}; +}); -ws.onmessage = (e) => { - console.log("msg recv: " + e.data); -}; +ws.addEventListener("message", (event) => { + console.log(`Message received: ${event.data}`); +}); -ws.onclose = (e) => { - console.log("Connection close"); -}; +ws.addEventListener("close", (event) => { + console.log("Connection closed"); +});