Compare commits
3 Commits
187238c7e4
...
eda649c65f
Author | SHA1 | Date | |
---|---|---|---|
eda649c65f | |||
dcd1a03c69 | |||
ea3c096ea9 |
41
api/api.go
41
api/api.go
@ -28,7 +28,7 @@ func NewGinAdapter(corePort ports.CorePort, cfg config.Config) *GinAdapter {
|
|||||||
adapter.port = cfg.API.Port
|
adapter.port = cfg.API.Port
|
||||||
adapter.cfg = cfg
|
adapter.cfg = cfg
|
||||||
|
|
||||||
//gin.SetMode(gin.ReleaseMode)
|
gin.SetMode(gin.ReleaseMode)
|
||||||
|
|
||||||
adapter.router = gin.New()
|
adapter.router = gin.New()
|
||||||
|
|
||||||
@ -37,15 +37,20 @@ func NewGinAdapter(corePort ports.CorePort, cfg config.Config) *GinAdapter {
|
|||||||
go manager.start()
|
go manager.start()
|
||||||
|
|
||||||
// API
|
// API
|
||||||
adapter.router.GET("/v1/data/channel/:id", adapter.GetChannelByIDHandler)
|
v1 := adapter.router.Group("/v1")
|
||||||
adapter.router.GET("/v1/data/channels", adapter.GetChannelsHandler)
|
v1.GET("/data/channel/:id", adapter.GetChannelByIDHandler)
|
||||||
adapter.router.GET("/v1/data/ratio", adapter.GetRatioHandler)
|
v1.GET("/data/channels", adapter.GetChannelsHandler)
|
||||||
adapter.router.GET("/v1/data", adapter.GetDataHandler)
|
v1.GET("/data/ratio", adapter.GetRatioHandler)
|
||||||
|
v1.GET("/data", adapter.GetDataHandler)
|
||||||
|
|
||||||
adapter.router.GET("/v1/config/channel/:id", adapter.GetConfigChannelByIdHandler)
|
v1.GET("/config/channel/:id", adapter.GetConfigChannelByIdHandler)
|
||||||
adapter.router.GET("/v1/config/channels", adapter.GetConfigChannelsHandler)
|
v1.GET("/config/channels", adapter.GetConfigChannelsHandler)
|
||||||
adapter.router.GET("/v1/config/device", adapter.GetConfigDeviceHandler)
|
v1.GET("/config/device", adapter.GetConfigDeviceHandler)
|
||||||
adapter.router.GET("/v1/config/db", adapter.GetConfigDBHandler)
|
v1.GET("/config/db", adapter.GetConfigDBHandler)
|
||||||
|
|
||||||
|
v1.POST("/config/device", adapter.SetConfigDeviceHandler)
|
||||||
|
v1.POST("/config/channel/:id", adapter.SetConfigChannelByIdHandler)
|
||||||
|
v1.POST("/config/db", adapter.SetConfigDBHandler)
|
||||||
|
|
||||||
adapter.router.GET("/ws", func(c *gin.Context) {
|
adapter.router.GET("/ws", func(c *gin.Context) {
|
||||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||||
@ -54,25 +59,17 @@ func NewGinAdapter(corePort ports.CorePort, cfg config.Config) *GinAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
client := &Client{
|
client := &Client{
|
||||||
socket: conn,
|
socket: conn,
|
||||||
send: make(chan []byte),
|
send: make(chan []byte),
|
||||||
manager: manager,
|
manager: manager,
|
||||||
all: false,
|
|
||||||
channels: false,
|
|
||||||
config: false,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
manager.register <- client
|
manager.register <- client
|
||||||
|
|
||||||
go client.read()
|
go client.writer()
|
||||||
go client.write()
|
|
||||||
})
|
})
|
||||||
|
|
||||||
adapter.router.POST("/v1/config/device", adapter.SetConfigDeviceHandler)
|
// start web socket service that dumps new data to websocket chan
|
||||||
adapter.router.POST("/v1/config/channel/:id", adapter.SetConfigChannelByIdHandler)
|
|
||||||
adapter.router.POST("/v1/config/db", adapter.SetConfigDBHandler)
|
|
||||||
|
|
||||||
// start service that dumps new data to websocket chan
|
|
||||||
go startService(manager, corePort)
|
go startService(manager, corePort)
|
||||||
|
|
||||||
// TH7 demo code. html file and javascript
|
// TH7 demo code. html file and javascript
|
||||||
|
@ -10,31 +10,35 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
CheckChannelDur = time.Millisecond * 700
|
CheckChannelDur = time.Millisecond * 1200
|
||||||
StartupSleepDur = time.Second * 5
|
StartupSleepDur = time.Second * 5
|
||||||
)
|
)
|
||||||
|
|
||||||
type ChannelsWrapper struct {
|
type Wrapper struct {
|
||||||
Channels []core.Channel `json:"channels"`
|
Channels []core.Channel `json:"channels"`
|
||||||
|
Ratio core.Ratio `json:"ratio"`
|
||||||
|
Timestamp string `json:"time"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Regularly get the newest data and bung it out on ws
|
||||||
func startService(man *Manager, core ports.CorePort) {
|
func startService(man *Manager, core ports.CorePort) {
|
||||||
|
|
||||||
var chwrap ChannelsWrapper
|
var wrap Wrapper
|
||||||
|
|
||||||
time.Sleep(StartupSleepDur)
|
time.Sleep(StartupSleepDur)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
chwrap.Channels = core.GetChannels()
|
wrap.Channels = core.GetChannels()
|
||||||
marshaled, err := json.Marshal(chwrap)
|
wrap.Ratio = core.GetRatio()
|
||||||
|
wrap.Timestamp = time.Now().Format(time.RFC1123)
|
||||||
|
|
||||||
|
marshaled, err := json.Marshal(wrap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Error marshaling struct:", chwrap.Channels)
|
log.Println("Error marshaling struct:", wrap)
|
||||||
}
|
}
|
||||||
|
|
||||||
man.broadcast <- Message{
|
man.broadcast <- []byte(marshaled)
|
||||||
messageType: CHANNELS,
|
|
||||||
data: []byte(marshaled),
|
|
||||||
}
|
|
||||||
time.Sleep(CheckChannelDur)
|
time.Sleep(CheckChannelDur)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
115
api/websocket.go
115
api/websocket.go
@ -1,10 +1,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
@ -17,31 +14,15 @@ var upgrader = websocket.Upgrader{
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
type MessageType = int
|
|
||||||
|
|
||||||
const (
|
|
||||||
ALL MessageType = iota
|
|
||||||
CHANNELS
|
|
||||||
CONFIG
|
|
||||||
)
|
|
||||||
|
|
||||||
type Message struct {
|
|
||||||
messageType MessageType
|
|
||||||
data []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
socket *websocket.Conn
|
socket *websocket.Conn
|
||||||
send chan []byte
|
send chan []byte
|
||||||
manager *Manager
|
manager *Manager
|
||||||
channels bool
|
|
||||||
config bool
|
|
||||||
all bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
clients map[*Client]bool
|
clients map[*Client]bool
|
||||||
broadcast chan Message
|
broadcast chan []byte
|
||||||
register chan *Client
|
register chan *Client
|
||||||
unregister chan *Client
|
unregister chan *Client
|
||||||
}
|
}
|
||||||
@ -49,50 +30,12 @@ type Manager struct {
|
|||||||
func NewManager() *Manager {
|
func NewManager() *Manager {
|
||||||
return &Manager{
|
return &Manager{
|
||||||
clients: make(map[*Client]bool),
|
clients: make(map[*Client]bool),
|
||||||
broadcast: make(chan Message),
|
broadcast: make(chan []byte),
|
||||||
register: make(chan *Client),
|
register: make(chan *Client),
|
||||||
unregister: make(chan *Client),
|
unregister: make(chan *Client),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (man *Manager) BroadcastAll(msg []byte) {
|
|
||||||
for client := range man.clients {
|
|
||||||
select {
|
|
||||||
case client.send <- msg:
|
|
||||||
default:
|
|
||||||
man.unregister <- client
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (man *Manager) BroadcastChannels(msg []byte) {
|
|
||||||
for client := range man.clients {
|
|
||||||
if !client.all && !client.channels {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case client.send <- msg:
|
|
||||||
default:
|
|
||||||
man.unregister <- client
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (man *Manager) BroadcastConfig(msg []byte) {
|
|
||||||
for client := range man.clients {
|
|
||||||
if !client.all && !client.config {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case client.send <- msg:
|
|
||||||
default:
|
|
||||||
man.unregister <- client
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (man *Manager) start() {
|
func (man *Manager) start() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -103,56 +46,25 @@ func (man *Manager) start() {
|
|||||||
// connection closed
|
// connection closed
|
||||||
case conn := <-man.unregister:
|
case conn := <-man.unregister:
|
||||||
if _, ok := man.clients[conn]; ok {
|
if _, ok := man.clients[conn]; ok {
|
||||||
fmt.Println("Unregistering:", conn)
|
|
||||||
close(conn.send)
|
close(conn.send)
|
||||||
delete(man.clients, conn)
|
delete(man.clients, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcast a message to all connected clients
|
// broadcast a message to all connected clients
|
||||||
case message := <-man.broadcast:
|
case message := <-man.broadcast:
|
||||||
switch message.messageType {
|
for client := range man.clients {
|
||||||
case ALL:
|
select {
|
||||||
man.BroadcastAll(message.data)
|
case client.send <- message:
|
||||||
case CHANNELS:
|
default:
|
||||||
man.BroadcastChannels(message.data)
|
man.unregister <- client
|
||||||
case CONFIG:
|
}
|
||||||
man.BroadcastConfig(message.data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) read() {
|
func (c *Client) writer() {
|
||||||
|
|
||||||
defer func() {
|
|
||||||
c.manager.unregister <- c
|
|
||||||
_ = c.socket.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
_, message, err := c.socket.ReadMessage()
|
|
||||||
if err != nil {
|
|
||||||
c.manager.unregister <- c
|
|
||||||
_ = c.socket.Close()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
switch strings.ToLower(string(message)) {
|
|
||||||
case "all":
|
|
||||||
c.all = true
|
|
||||||
case "channels":
|
|
||||||
c.channels = true
|
|
||||||
case "config":
|
|
||||||
c.config = true
|
|
||||||
default:
|
|
||||||
log.Println("received: ", message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) write() {
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = c.socket.Close()
|
_ = c.socket.Close()
|
||||||
@ -163,12 +75,13 @@ func (c *Client) write() {
|
|||||||
case message, ok := <-c.send:
|
case message, ok := <-c.send:
|
||||||
if !ok {
|
if !ok {
|
||||||
_ = c.socket.WriteMessage(websocket.CloseMessage, []byte{})
|
_ = c.socket.WriteMessage(websocket.CloseMessage, []byte{})
|
||||||
|
c.manager.unregister <- c
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.socket.WriteMessage(websocket.TextMessage, message)
|
err := c.socket.WriteMessage(websocket.TextMessage, message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
c.manager.unregister <- c
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package config
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
"th7/data/config"
|
"th7/data/config"
|
||||||
"th7/data/thermocouple"
|
"th7/data/thermocouple"
|
||||||
@ -80,7 +81,7 @@ func Load() (config.Config, error) {
|
|||||||
|
|
||||||
tc, err := GetThermocoupleType(v.GetString(head + ".type"))
|
tc, err := GetThermocoupleType(v.GetString(head + ".type"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("%s.type=%s\n", head, v.GetString(head+".type"))
|
log.Printf("%s.type=%s\n", head, v.GetString(head+".type"))
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -84,7 +85,7 @@ func NewInfluxDBAdapter(config config.Config) (*InfluxDBAdapter, error) {
|
|||||||
`
|
`
|
||||||
_, err = adapter.client.Query(initContext, query)
|
_, err = adapter.client.Query(initContext, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error initialising InfluxDB. Check details/network connection.")
|
log.Println("Error initialising InfluxDB. Check details/network connection.")
|
||||||
return &adapter, err
|
return &adapter, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,7 +125,7 @@ func (ad *InfluxDBAdapter) Save(channels []core.Channel) error {
|
|||||||
|
|
||||||
// need at least 1 iteration to bother sending data to influx server
|
// need at least 1 iteration to bother sending data to influx server
|
||||||
if logged == 0 {
|
if logged == 0 {
|
||||||
fmt.Printf("[%s] InfluxDB: nothing to do.\n", time.Now().Format(time.DateTime))
|
log.Printf("[%s] InfluxDB: nothing to do.\n", time.Now().Format(time.DateTime))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,7 +137,7 @@ func (ad *InfluxDBAdapter) Save(channels []core.Channel) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("[%s] InfluxDB: logged %d channels OK\n", time.Now().Format(time.DateTime), logged)
|
log.Printf("[%s] InfluxDB: logged %d channels OK\n", time.Now().Format(time.DateTime), logged)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
5
main.go
5
main.go
@ -1,7 +1,6 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@ -55,7 +54,7 @@ func main() {
|
|||||||
defer dbPort.Close()
|
defer dbPort.Close()
|
||||||
|
|
||||||
color.Set(color.FgHiRed)
|
color.Set(color.FgHiRed)
|
||||||
fmt.Printf("Started on: %s\n", time.Now().Format(time.DateTime))
|
log.Printf("Started on: %s\n", time.Now().Format(time.DateTime))
|
||||||
color.Unset()
|
color.Unset()
|
||||||
|
|
||||||
// if noweb is false and HTTP/REST API is enabled then start web service
|
// if noweb is false and HTTP/REST API is enabled then start web service
|
||||||
@ -64,7 +63,7 @@ func main() {
|
|||||||
go apiPort.Run()
|
go apiPort.Run()
|
||||||
|
|
||||||
color.Set(color.FgHiGreen)
|
color.Set(color.FgHiGreen)
|
||||||
fmt.Printf("TH7 API is live on http://localhost:%d/\n", cfg.API.Port)
|
log.Printf("TH7 API is live on http://localhost:%d/\n", cfg.API.Port)
|
||||||
color.Unset()
|
color.Unset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package pcb
|
package pcb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"th7/data/pcb"
|
"th7/data/pcb"
|
||||||
"time"
|
"time"
|
||||||
@ -17,7 +17,7 @@ func NewDummyAdapter() (*DummyAdapter, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *DummyAdapter) Deinit() error {
|
func (d *DummyAdapter) Deinit() error {
|
||||||
fmt.Println("dummy pcb adapter deinit")
|
log.Println("dummy pcb adapter deinit")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,16 +91,16 @@ setInterval(updateRead, 1000);
|
|||||||
setInterval(updateConfig, 10000);
|
setInterval(updateConfig, 10000);
|
||||||
|
|
||||||
// websocket demo
|
// websocket demo
|
||||||
const ws = new WebSocket("ws://" + window.location.host + "/ws");
|
const ws = new WebSocket(`ws://${window.location.host}/ws`);
|
||||||
ws.onopen = (e) => {
|
|
||||||
|
ws.addEventListener("open", (event) => {
|
||||||
console.log("Connection open");
|
console.log("Connection open");
|
||||||
ws.send("ALL");
|
});
|
||||||
};
|
|
||||||
|
|
||||||
ws.onmessage = (e) => {
|
ws.addEventListener("message", (event) => {
|
||||||
console.log("msg recv: " + e.data);
|
console.log(`Message received: ${event.data}`);
|
||||||
};
|
});
|
||||||
|
|
||||||
ws.onclose = (e) => {
|
ws.addEventListener("close", (event) => {
|
||||||
console.log("Connection close");
|
console.log("Connection closed");
|
||||||
};
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user