th7/db/influxdb.go

143 lines
3.2 KiB
Go
Raw Normal View History

2023-11-23 20:46:54 +00:00
package db
import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"th7/data/config"
"th7/data/core"
"th7/data/thermocouple"
"time"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)
var (
ErrNoToken = errors.New("InfluxDB requires a secret token field")
ErrNoBucket = errors.New("InfluxDB requires a bucket field (database name)")
ErrNoHost = errors.New("InfluxDB requires a host field (http://example-influxdb-host.com)")
)
const (
INFLUXDB_MAX_INIT_DUR = 5000 * time.Millisecond
INFLUXDB_MAX_SAVE_DUR = 2000 * time.Millisecond
)
type InfluxDBAdapter struct {
mu sync.Mutex
client *influxdb3.Client
cfg config.Config
}
func NewInfluxDBAdapter(config config.Config) (*InfluxDBAdapter, error) {
var adapter InfluxDBAdapter
var err error
// if `token' is not present in the DB map, check env
if _, ok := config.DB["token"]; !ok {
if token := os.Getenv("INFLUXDB_TOKEN"); len(token) > 0 {
config.DB["token"] = token
} else {
return &adapter, ErrNoToken
}
}
// if `bucket' is not present in the DB map, check env
if _, ok := config.DB["bucket"]; !ok {
if bucket := os.Getenv("INFLUXDB_BUCKET"); len(bucket) > 0 {
config.DB["bucket"] = bucket
} else {
return &adapter, ErrNoBucket
}
}
// if `host' is not present in the DB map, check env
if _, ok := config.DB["host"]; !ok {
if host := os.Getenv("INFLUXDB_HOST"); len(host) > 0 {
config.DB["host"] = host
} else {
return &adapter, ErrNoHost
}
}
adapter.cfg = config
adapter.client, err = influxdb3.New(influxdb3.ClientConfig{
2023-11-24 10:46:49 +00:00
Host: config.DB["host"].(string),
Token: config.DB["token"].(string),
Database: config.DB["bucket"].(string),
2023-11-23 20:46:54 +00:00
})
if err != nil {
return &adapter, err
}
initContext, cancel := context.WithTimeout(context.Background(), INFLUXDB_MAX_INIT_DUR)
defer cancel()
// Test database connection/auth
query := `
SELECT 1;
`
_, err = adapter.client.Query(initContext, query)
if err != nil {
fmt.Println("Error initialising InfluxDB. Check details/network connection.")
return &adapter, err
}
return &adapter, nil
}
func (ad *InfluxDBAdapter) Close() {
ad.client.Close()
}
func (ad *InfluxDBAdapter) Save(channels []core.Channel) error {
ad.mu.Lock()
defer ad.mu.Unlock()
var sb strings.Builder
var err error
2023-11-25 10:37:02 +00:00
logged := 0
2023-11-23 20:46:54 +00:00
// influx does timestamping when data arrives to server
for c := range channels {
2023-11-25 10:37:02 +00:00
if !ad.cfg.Channels[c].Log {
continue
}
2023-11-23 20:46:54 +00:00
id := channels[c].Id
value := channels[c].Value
gain := ad.cfg.Channels[c].Gain
offset := ad.cfg.Channels[c].Offset
thermo := thermocouple.ThermoStringLookup[ad.cfg.Channels[c].Thermo]
sb.WriteString(fmt.Sprintf("TH7,channel=%d,thermocouple=%s gain=%g,offset=%g,value=%g\n",
id, thermo, gain, offset, value))
2023-11-25 10:37:02 +00:00
logged++
}
// need at least 1 iteration to bother sending data to influx server
if logged == 0 {
fmt.Printf("[%s] InfluxDB: nothing to do.\n", time.Now().Format(time.DateTime))
return nil
2023-11-23 20:46:54 +00:00
}
2023-11-24 10:46:49 +00:00
saveContext, cancel := context.WithTimeout(context.Background(), INFLUXDB_MAX_SAVE_DUR)
2023-11-23 20:46:54 +00:00
defer cancel()
2023-11-24 10:46:49 +00:00
err = ad.client.Write(saveContext, []byte(sb.String()))
2023-11-23 20:46:54 +00:00
if err != nil {
2023-11-23 20:55:10 +00:00
return err
2023-11-23 20:46:54 +00:00
}
2023-11-23 20:55:10 +00:00
2023-11-25 10:37:02 +00:00
fmt.Printf("[%s] InfluxDB: logged %d channels OK\n", time.Now().Format(time.DateTime), logged)
2023-11-24 10:57:34 +00:00
2023-11-23 20:46:54 +00:00
return nil
}