From 187238c7e4e22d625ec0bd7888b62d085226ffee Mon Sep 17 00:00:00 2001 From: William Clark Date: Tue, 12 Dec 2023 21:07:04 +0000 Subject: [PATCH] websockets! --- api/{http/gin.go => api.go} | 33 ++++++- api/{http => }/config.go | 6 +- api/{http => }/data.go | 2 +- api/service.go | 41 +++++++++ api/{http => }/util.go | 2 +- api/websocket.go | 177 ++++++++++++++++++++++++++++++++++++ config/config.go | 7 +- data/config/data.go | 2 +- go.mod | 1 + go.sum | 2 + main.go | 8 +- static/data-updater.js | 15 +++ 12 files changed, 279 insertions(+), 17 deletions(-) rename api/{http/gin.go => api.go} (72%) rename api/{http => }/config.go (98%) rename api/{http => }/data.go (99%) create mode 100644 api/service.go rename api/{http => }/util.go (96%) create mode 100644 api/websocket.go diff --git a/api/http/gin.go b/api/api.go similarity index 72% rename from api/http/gin.go rename to api/api.go index 7dab07c..70d3734 100644 --- a/api/http/gin.go +++ b/api/api.go @@ -1,7 +1,8 @@ -package http +package api import ( "fmt" + "log" "github.com/gin-gonic/gin" @@ -24,13 +25,17 @@ func NewGinAdapter(corePort ports.CorePort, cfg config.Config) *GinAdapter { var adapter GinAdapter adapter.corePort = corePort - adapter.port = cfg.API[0].Port + adapter.port = cfg.API.Port adapter.cfg = cfg //gin.SetMode(gin.ReleaseMode) adapter.router = gin.New() + // websockets + manager := NewManager() + go manager.start() + // API adapter.router.GET("/v1/data/channel/:id", adapter.GetChannelByIDHandler) adapter.router.GET("/v1/data/channels", adapter.GetChannelsHandler) @@ -42,10 +47,34 @@ func NewGinAdapter(corePort ports.CorePort, cfg config.Config) *GinAdapter { adapter.router.GET("/v1/config/device", adapter.GetConfigDeviceHandler) adapter.router.GET("/v1/config/db", adapter.GetConfigDBHandler) + adapter.router.GET("/ws", func(c *gin.Context) { + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + log.Println(err) + } + + client := &Client{ + socket: conn, + send: make(chan []byte), + manager: manager, + all: false, + channels: false, + config: false, + } + + manager.register <- client + + go client.read() + go client.write() + }) + adapter.router.POST("/v1/config/device", adapter.SetConfigDeviceHandler) adapter.router.POST("/v1/config/channel/:id", adapter.SetConfigChannelByIdHandler) adapter.router.POST("/v1/config/db", adapter.SetConfigDBHandler) + // start service that dumps new data to websocket chan + go startService(manager, corePort) + // TH7 demo code. html file and javascript adapter.router.GET("/", adapter.IndexHandler) adapter.router.Static("/assets", "./static") diff --git a/api/http/config.go b/api/config.go similarity index 98% rename from api/http/config.go rename to api/config.go index 17ac09a..b1ade5a 100644 --- a/api/http/config.go +++ b/api/config.go @@ -1,4 +1,4 @@ -package http +package api import ( "errors" @@ -192,7 +192,7 @@ func (g *GinAdapter) GetConfigDeviceHandler(c *gin.Context) { func (g *GinAdapter) GetConfigDBHandler(c *gin.Context) { var timestamp string - if g.cfg.API[0].Restricted { + if g.cfg.API.Restricted { c.String(http.StatusForbidden, ErrConfigRestrictedDB.Error()) return } @@ -275,7 +275,7 @@ func (g *GinAdapter) SetConfigDBHandler(c *gin.Context) { var req PostRequest - if g.cfg.API[0].Restricted { + if g.cfg.API.Restricted { c.String(http.StatusForbidden, ErrConfigRestrictedDB.Error()) return } diff --git a/api/http/data.go b/api/data.go similarity index 99% rename from api/http/data.go rename to api/data.go index 8219b9d..0efc541 100644 --- a/api/http/data.go +++ b/api/data.go @@ -1,4 +1,4 @@ -package http +package api import ( "net/http" diff --git a/api/service.go b/api/service.go new file mode 100644 index 0000000..93c43cf --- /dev/null +++ b/api/service.go @@ -0,0 +1,41 @@ +package api + +import ( + "log" + "th7/data/core" + "th7/ports" + "time" + + "encoding/json" +) + +const ( + CheckChannelDur = time.Millisecond * 700 + StartupSleepDur = time.Second * 5 +) + +type ChannelsWrapper struct { + Channels []core.Channel `json:"channels"` +} + +func startService(man *Manager, core ports.CorePort) { + + var chwrap ChannelsWrapper + + time.Sleep(StartupSleepDur) + + for { + chwrap.Channels = core.GetChannels() + marshaled, err := json.Marshal(chwrap) + if err != nil { + log.Println("Error marshaling struct:", chwrap.Channels) + } + + man.broadcast <- Message{ + messageType: CHANNELS, + data: []byte(marshaled), + } + time.Sleep(CheckChannelDur) + } + +} diff --git a/api/http/util.go b/api/util.go similarity index 96% rename from api/http/util.go rename to api/util.go index 8ecc9ce..cd66c6e 100644 --- a/api/http/util.go +++ b/api/util.go @@ -1,4 +1,4 @@ -package http +package api import "time" diff --git a/api/websocket.go b/api/websocket.go new file mode 100644 index 0000000..323fcbf --- /dev/null +++ b/api/websocket.go @@ -0,0 +1,177 @@ +package api + +import ( + "fmt" + "log" + "net/http" + "strings" + + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type MessageType = int + +const ( + ALL MessageType = iota + CHANNELS + CONFIG +) + +type Message struct { + messageType MessageType + data []byte +} + +type Client struct { + socket *websocket.Conn + send chan []byte + manager *Manager + channels bool + config bool + all bool +} + +type Manager struct { + clients map[*Client]bool + broadcast chan Message + register chan *Client + unregister chan *Client +} + +func NewManager() *Manager { + return &Manager{ + clients: make(map[*Client]bool), + broadcast: make(chan Message), + register: make(chan *Client), + unregister: make(chan *Client), + } +} + +func (man *Manager) BroadcastAll(msg []byte) { + for client := range man.clients { + select { + case client.send <- msg: + default: + man.unregister <- client + } + } +} + +func (man *Manager) BroadcastChannels(msg []byte) { + for client := range man.clients { + if !client.all && !client.channels { + continue + } + + select { + case client.send <- msg: + default: + man.unregister <- client + } + } +} + +func (man *Manager) BroadcastConfig(msg []byte) { + for client := range man.clients { + if !client.all && !client.config { + continue + } + + select { + case client.send <- msg: + default: + man.unregister <- client + } + } +} + +func (man *Manager) start() { + for { + select { + // new connection + case conn := <-man.register: + man.clients[conn] = true + + // connection closed + case conn := <-man.unregister: + if _, ok := man.clients[conn]; ok { + fmt.Println("Unregistering:", conn) + close(conn.send) + delete(man.clients, conn) + } + + // broadcast a message to all connected clients + case message := <-man.broadcast: + switch message.messageType { + case ALL: + man.BroadcastAll(message.data) + case CHANNELS: + man.BroadcastChannels(message.data) + case CONFIG: + man.BroadcastConfig(message.data) + } + + } + + } +} + +func (c *Client) read() { + + defer func() { + c.manager.unregister <- c + _ = c.socket.Close() + }() + + for { + _, message, err := c.socket.ReadMessage() + if err != nil { + c.manager.unregister <- c + _ = c.socket.Close() + break + } + + switch strings.ToLower(string(message)) { + case "all": + c.all = true + case "channels": + c.channels = true + case "config": + c.config = true + default: + log.Println("received: ", message) + } + } +} + +func (c *Client) write() { + + defer func() { + _ = c.socket.Close() + }() + + for { + select { + case message, ok := <-c.send: + if !ok { + _ = c.socket.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + err := c.socket.WriteMessage(websocket.TextMessage, message) + if err != nil { + fmt.Println(err) + return + } + } + } + +} diff --git a/config/config.go b/config/config.go index 02d5657..71bd8de 100644 --- a/config/config.go +++ b/config/config.go @@ -59,14 +59,11 @@ func Load() (config.Config, error) { cfg.Board.NoWeb = v.GetBool("TH7.noweb") cfg.Channels = make([]config.Channel, 0) - cfg.API = make([]config.API, 0) - - // just REST API for now .. - cfg.API = append(cfg.API, config.API{ + cfg.API = config.API{ Port: v.GetInt("API.HTTP.port"), Enabled: v.GetBool("API.HTTP.enabled"), Restricted: v.GetBool("API.HTTP.restricted"), - }) + } for i := 1; i < 8; i++ { var c config.Channel diff --git a/data/config/data.go b/data/config/data.go index e246b8a..caaf3ba 100644 --- a/data/config/data.go +++ b/data/config/data.go @@ -35,5 +35,5 @@ type Config struct { Board TH7 `json:"TH7"` Channels []Channel `json:"channels"` DB map[string]interface{} `json:"db"` - API []API `json:"API"` + API API `json:"API"` } diff --git a/go.mod b/go.mod index eb49ad1..5b7d04d 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/InfluxCommunity/influxdb3-go v0.4.0 github.com/fatih/color v1.16.0 github.com/gin-gonic/gin v1.8.1 + github.com/gorilla/websocket v1.5.1 ) require ( diff --git a/go.sum b/go.sum index 949fe00..a17e3ca 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,8 @@ github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/main.go b/main.go index e99eb16..e97f27a 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "os" "os/signal" "syscall" - "th7/api/http" + "th7/api" "th7/config" "th7/core" "th7/db" @@ -59,12 +59,12 @@ func main() { color.Unset() // if noweb is false and HTTP/REST API is enabled then start web service - if !cfg.Board.NoWeb && cfg.API[0].Enabled { - apiPort = http.NewGinAdapter(corePort, cfg) + if !cfg.Board.NoWeb && cfg.API.Enabled { + apiPort = api.NewGinAdapter(corePort, cfg) go apiPort.Run() color.Set(color.FgHiGreen) - fmt.Printf("TH7 API is live on http://localhost:%d/\n", cfg.API[0].Port) + fmt.Printf("TH7 API is live on http://localhost:%d/\n", cfg.API.Port) color.Unset() } diff --git a/static/data-updater.js b/static/data-updater.js index 8eb6f22..712fad1 100644 --- a/static/data-updater.js +++ b/static/data-updater.js @@ -89,3 +89,18 @@ updateConfig(); setInterval(updateRead, 1000); setInterval(updateConfig, 10000); + +// websocket demo +const ws = new WebSocket("ws://" + window.location.host + "/ws"); +ws.onopen = (e) => { + console.log("Connection open"); + ws.send("ALL"); +}; + +ws.onmessage = (e) => { + console.log("msg recv: " + e.data); +}; + +ws.onclose = (e) => { + console.log("Connection close"); +};