th7/db/influxdb.go

134 lines
3.1 KiB
Go
Raw Normal View History

2023-11-23 20:46:54 +00:00
package db
import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"th7/data/config"
"th7/data/core"
"th7/data/thermocouple"
"time"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)
var (
ErrNoToken = errors.New("InfluxDB requires a secret token field")
ErrNoBucket = errors.New("InfluxDB requires a bucket field (database name)")
ErrNoHost = errors.New("InfluxDB requires a host field (http://example-influxdb-host.com)")
)
const (
INFLUXDB_MAX_INIT_DUR = 5000 * time.Millisecond
INFLUXDB_MAX_SAVE_DUR = 2000 * time.Millisecond
)
type InfluxDBAdapter struct {
mu sync.Mutex
client *influxdb3.Client
cfg config.Config
token string
bucket string
host string
}
func NewInfluxDBAdapter(config config.Config) (*InfluxDBAdapter, error) {
var adapter InfluxDBAdapter
var err error
// if `token' is not present in the DB map, check env
if _, ok := config.DB["token"]; !ok {
if token := os.Getenv("INFLUXDB_TOKEN"); len(token) > 0 {
config.DB["token"] = token
} else {
return &adapter, ErrNoToken
}
}
// if `bucket' is not present in the DB map, check env
if _, ok := config.DB["bucket"]; !ok {
if bucket := os.Getenv("INFLUXDB_BUCKET"); len(bucket) > 0 {
config.DB["bucket"] = bucket
} else {
return &adapter, ErrNoBucket
}
}
// if `host' is not present in the DB map, check env
if _, ok := config.DB["host"]; !ok {
if host := os.Getenv("INFLUXDB_HOST"); len(host) > 0 {
config.DB["host"] = host
} else {
return &adapter, ErrNoHost
}
}
adapter.cfg = config
adapter.token = config.DB["token"].(string)
adapter.bucket = config.DB["bucket"].(string)
adapter.host = config.DB["host"].(string)
adapter.client, err = influxdb3.New(influxdb3.ClientConfig{
Host: adapter.host,
Token: adapter.token,
Database: adapter.bucket,
})
if err != nil {
return &adapter, err
}
initContext, cancel := context.WithTimeout(context.Background(), INFLUXDB_MAX_INIT_DUR)
defer cancel()
// Test database connection/auth
query := `
SELECT 1;
`
_, err = adapter.client.Query(initContext, query)
if err != nil {
fmt.Println("Error initialising InfluxDB. Check details/network connection.")
return &adapter, err
}
return &adapter, nil
}
func (ad *InfluxDBAdapter) Close() {
ad.client.Close()
}
func (ad *InfluxDBAdapter) Save(channels []core.Channel) error {
ad.mu.Lock()
defer ad.mu.Unlock()
var sb strings.Builder
var err error
// influx does timestamping when data arrives to server
for c := range channels {
id := channels[c].Id
value := channels[c].Value
gain := ad.cfg.Channels[c].Gain
offset := ad.cfg.Channels[c].Offset
thermo := thermocouple.ThermoStringLookup[ad.cfg.Channels[c].Thermo]
sb.WriteString(fmt.Sprintf("TH7,channel=%d,thermocouple=%s gain=%g,offset=%g,value=%g\n",
id, thermo, gain, offset, value))
}
initContext, cancel := context.WithTimeout(context.Background(), INFLUXDB_MAX_SAVE_DUR)
defer cancel()
fmt.Printf("[InfluxDB] logged %d channels OK\n", len(channels))
err = ad.client.Write(initContext, []byte(sb.String()))
if err != nil {
panic(err)
}
return nil
}