simplify websockets
This commit is contained in:
parent
187238c7e4
commit
ea3c096ea9
18
api/api.go
18
api/api.go
@ -54,27 +54,23 @@ func NewGinAdapter(corePort ports.CorePort, cfg config.Config) *GinAdapter {
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
socket: conn,
|
||||
send: make(chan []byte),
|
||||
manager: manager,
|
||||
all: false,
|
||||
channels: false,
|
||||
config: false,
|
||||
socket: conn,
|
||||
send: make(chan []byte),
|
||||
manager: manager,
|
||||
}
|
||||
|
||||
manager.register <- client
|
||||
|
||||
go client.read()
|
||||
go client.write()
|
||||
go client.writer()
|
||||
})
|
||||
|
||||
// start web socket service that dumps new data to websocket chan
|
||||
go startService(manager, corePort)
|
||||
|
||||
adapter.router.POST("/v1/config/device", adapter.SetConfigDeviceHandler)
|
||||
adapter.router.POST("/v1/config/channel/:id", adapter.SetConfigChannelByIdHandler)
|
||||
adapter.router.POST("/v1/config/db", adapter.SetConfigDBHandler)
|
||||
|
||||
// start service that dumps new data to websocket chan
|
||||
go startService(manager, corePort)
|
||||
|
||||
// TH7 demo code. html file and javascript
|
||||
adapter.router.GET("/", adapter.IndexHandler)
|
||||
adapter.router.Static("/assets", "./static")
|
||||
|
@ -10,31 +10,35 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
CheckChannelDur = time.Millisecond * 700
|
||||
CheckChannelDur = time.Millisecond * 1200
|
||||
StartupSleepDur = time.Second * 5
|
||||
)
|
||||
|
||||
type ChannelsWrapper struct {
|
||||
Channels []core.Channel `json:"channels"`
|
||||
type Wrapper struct {
|
||||
Channels []core.Channel `json:"channels"`
|
||||
Ratio core.Ratio `json:"ratio"`
|
||||
Timestamp string `json:"time"`
|
||||
}
|
||||
|
||||
// Regularly get the newest data and bung it out on ws
|
||||
func startService(man *Manager, core ports.CorePort) {
|
||||
|
||||
var chwrap ChannelsWrapper
|
||||
var wrap Wrapper
|
||||
|
||||
time.Sleep(StartupSleepDur)
|
||||
|
||||
for {
|
||||
chwrap.Channels = core.GetChannels()
|
||||
marshaled, err := json.Marshal(chwrap)
|
||||
wrap.Channels = core.GetChannels()
|
||||
wrap.Ratio = core.GetRatio()
|
||||
wrap.Timestamp = time.Now().Format(time.RFC1123)
|
||||
|
||||
marshaled, err := json.Marshal(wrap)
|
||||
if err != nil {
|
||||
log.Println("Error marshaling struct:", chwrap.Channels)
|
||||
log.Println("Error marshaling struct:", wrap)
|
||||
}
|
||||
|
||||
man.broadcast <- Message{
|
||||
messageType: CHANNELS,
|
||||
data: []byte(marshaled),
|
||||
}
|
||||
man.broadcast <- []byte(marshaled)
|
||||
|
||||
time.Sleep(CheckChannelDur)
|
||||
}
|
||||
|
||||
|
115
api/websocket.go
115
api/websocket.go
@ -1,10 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
@ -17,31 +14,15 @@ var upgrader = websocket.Upgrader{
|
||||
},
|
||||
}
|
||||
|
||||
type MessageType = int
|
||||
|
||||
const (
|
||||
ALL MessageType = iota
|
||||
CHANNELS
|
||||
CONFIG
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
messageType MessageType
|
||||
data []byte
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
socket *websocket.Conn
|
||||
send chan []byte
|
||||
manager *Manager
|
||||
channels bool
|
||||
config bool
|
||||
all bool
|
||||
socket *websocket.Conn
|
||||
send chan []byte
|
||||
manager *Manager
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
clients map[*Client]bool
|
||||
broadcast chan Message
|
||||
broadcast chan []byte
|
||||
register chan *Client
|
||||
unregister chan *Client
|
||||
}
|
||||
@ -49,50 +30,12 @@ type Manager struct {
|
||||
func NewManager() *Manager {
|
||||
return &Manager{
|
||||
clients: make(map[*Client]bool),
|
||||
broadcast: make(chan Message),
|
||||
broadcast: make(chan []byte),
|
||||
register: make(chan *Client),
|
||||
unregister: make(chan *Client),
|
||||
}
|
||||
}
|
||||
|
||||
func (man *Manager) BroadcastAll(msg []byte) {
|
||||
for client := range man.clients {
|
||||
select {
|
||||
case client.send <- msg:
|
||||
default:
|
||||
man.unregister <- client
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (man *Manager) BroadcastChannels(msg []byte) {
|
||||
for client := range man.clients {
|
||||
if !client.all && !client.channels {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case client.send <- msg:
|
||||
default:
|
||||
man.unregister <- client
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (man *Manager) BroadcastConfig(msg []byte) {
|
||||
for client := range man.clients {
|
||||
if !client.all && !client.config {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case client.send <- msg:
|
||||
default:
|
||||
man.unregister <- client
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (man *Manager) start() {
|
||||
for {
|
||||
select {
|
||||
@ -103,56 +46,25 @@ func (man *Manager) start() {
|
||||
// connection closed
|
||||
case conn := <-man.unregister:
|
||||
if _, ok := man.clients[conn]; ok {
|
||||
fmt.Println("Unregistering:", conn)
|
||||
close(conn.send)
|
||||
delete(man.clients, conn)
|
||||
}
|
||||
|
||||
// broadcast a message to all connected clients
|
||||
case message := <-man.broadcast:
|
||||
switch message.messageType {
|
||||
case ALL:
|
||||
man.BroadcastAll(message.data)
|
||||
case CHANNELS:
|
||||
man.BroadcastChannels(message.data)
|
||||
case CONFIG:
|
||||
man.BroadcastConfig(message.data)
|
||||
for client := range man.clients {
|
||||
select {
|
||||
case client.send <- message:
|
||||
default:
|
||||
man.unregister <- client
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) read() {
|
||||
|
||||
defer func() {
|
||||
c.manager.unregister <- c
|
||||
_ = c.socket.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
_, message, err := c.socket.ReadMessage()
|
||||
if err != nil {
|
||||
c.manager.unregister <- c
|
||||
_ = c.socket.Close()
|
||||
break
|
||||
}
|
||||
|
||||
switch strings.ToLower(string(message)) {
|
||||
case "all":
|
||||
c.all = true
|
||||
case "channels":
|
||||
c.channels = true
|
||||
case "config":
|
||||
c.config = true
|
||||
default:
|
||||
log.Println("received: ", message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) write() {
|
||||
func (c *Client) writer() {
|
||||
|
||||
defer func() {
|
||||
_ = c.socket.Close()
|
||||
@ -163,12 +75,13 @@ func (c *Client) write() {
|
||||
case message, ok := <-c.send:
|
||||
if !ok {
|
||||
_ = c.socket.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
c.manager.unregister <- c
|
||||
return
|
||||
}
|
||||
|
||||
err := c.socket.WriteMessage(websocket.TextMessage, message)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
c.manager.unregister <- c
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -91,16 +91,16 @@ setInterval(updateRead, 1000);
|
||||
setInterval(updateConfig, 10000);
|
||||
|
||||
// websocket demo
|
||||
const ws = new WebSocket("ws://" + window.location.host + "/ws");
|
||||
ws.onopen = (e) => {
|
||||
const ws = new WebSocket(`ws://${window.location.host}/ws`);
|
||||
|
||||
ws.addEventListener("open", (event) => {
|
||||
console.log("Connection open");
|
||||
ws.send("ALL");
|
||||
};
|
||||
});
|
||||
|
||||
ws.onmessage = (e) => {
|
||||
console.log("msg recv: " + e.data);
|
||||
};
|
||||
ws.addEventListener("message", (event) => {
|
||||
console.log(`Message received: ${event.data}`);
|
||||
});
|
||||
|
||||
ws.onclose = (e) => {
|
||||
console.log("Connection close");
|
||||
};
|
||||
ws.addEventListener("close", (event) => {
|
||||
console.log("Connection closed");
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user