timeout save() and cleanup
This commit is contained in:
parent
d7f138b7b9
commit
627834c072
@ -30,9 +30,6 @@ type InfluxDBAdapter struct {
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
client *influxdb3.Client
|
client *influxdb3.Client
|
||||||
cfg config.Config
|
cfg config.Config
|
||||||
token string
|
|
||||||
bucket string
|
|
||||||
host string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInfluxDBAdapter(config config.Config) (*InfluxDBAdapter, error) {
|
func NewInfluxDBAdapter(config config.Config) (*InfluxDBAdapter, error) {
|
||||||
@ -67,14 +64,11 @@ func NewInfluxDBAdapter(config config.Config) (*InfluxDBAdapter, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
adapter.cfg = config
|
adapter.cfg = config
|
||||||
adapter.token = config.DB["token"].(string)
|
|
||||||
adapter.bucket = config.DB["bucket"].(string)
|
|
||||||
adapter.host = config.DB["host"].(string)
|
|
||||||
|
|
||||||
adapter.client, err = influxdb3.New(influxdb3.ClientConfig{
|
adapter.client, err = influxdb3.New(influxdb3.ClientConfig{
|
||||||
Host: adapter.host,
|
Host: config.DB["host"].(string),
|
||||||
Token: adapter.token,
|
Token: config.DB["token"].(string),
|
||||||
Database: adapter.bucket,
|
Database: config.DB["bucket"].(string),
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -120,12 +114,12 @@ func (ad *InfluxDBAdapter) Save(channels []core.Channel) error {
|
|||||||
id, thermo, gain, offset, value))
|
id, thermo, gain, offset, value))
|
||||||
}
|
}
|
||||||
|
|
||||||
initContext, cancel := context.WithTimeout(context.Background(), INFLUXDB_MAX_SAVE_DUR)
|
saveContext, cancel := context.WithTimeout(context.Background(), INFLUXDB_MAX_SAVE_DUR)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
fmt.Printf("[InfluxDB] logged %d channels OK\n", len(channels))
|
fmt.Printf("[InfluxDB] logged %d channels OK\n", len(channels))
|
||||||
|
|
||||||
err = ad.client.Write(initContext, []byte(sb.String()))
|
err = ad.client.Write(saveContext, []byte(sb.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -20,6 +21,11 @@ type SQLite3Adapter struct {
|
|||||||
cfg config.Config
|
cfg config.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
SQLITE3_MAX_INIT_DUR = 5000 * time.Millisecond
|
||||||
|
SQLITE3_MAX_SAVE_DUR = 2000 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
func NewSQLite3Adapter(config config.Config) (*SQLite3Adapter, error) {
|
func NewSQLite3Adapter(config config.Config) (*SQLite3Adapter, error) {
|
||||||
|
|
||||||
var adapter SQLite3Adapter
|
var adapter SQLite3Adapter
|
||||||
@ -39,6 +45,9 @@ func NewSQLite3Adapter(config config.Config) (*SQLite3Adapter, error) {
|
|||||||
adapter.db = db
|
adapter.db = db
|
||||||
adapter.cfg = config
|
adapter.cfg = config
|
||||||
|
|
||||||
|
initContext, cancel := context.WithTimeout(context.Background(), SQLITE3_MAX_INIT_DUR)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
const create string = `
|
const create string = `
|
||||||
CREATE TABLE IF NOT EXISTS logs (
|
CREATE TABLE IF NOT EXISTS logs (
|
||||||
id INTEGER NOT NULL,
|
id INTEGER NOT NULL,
|
||||||
@ -49,7 +58,7 @@ func NewSQLite3Adapter(config config.Config) (*SQLite3Adapter, error) {
|
|||||||
timestamp TEXT NOT NULL
|
timestamp TEXT NOT NULL
|
||||||
);`
|
);`
|
||||||
|
|
||||||
if _, err := adapter.db.Exec(create); err != nil {
|
if _, err := adapter.db.ExecContext(initContext, create); err != nil {
|
||||||
return &adapter, err
|
return &adapter, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,6 +81,9 @@ func (ad *SQLite3Adapter) Save(channels []core.Channel) error {
|
|||||||
|
|
||||||
timestamp := time.Now().Format(time.DateTime)
|
timestamp := time.Now().Format(time.DateTime)
|
||||||
|
|
||||||
|
saveContext, cancel := context.WithTimeout(context.Background(), INFLUXDB_MAX_SAVE_DUR)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
for c := range channels {
|
for c := range channels {
|
||||||
|
|
||||||
id := channels[c].Id
|
id := channels[c].Id
|
||||||
@ -80,7 +92,7 @@ func (ad *SQLite3Adapter) Save(channels []core.Channel) error {
|
|||||||
offset := ad.cfg.Channels[c].Offset
|
offset := ad.cfg.Channels[c].Offset
|
||||||
thermo := thermocouple.ThermoStringLookup[ad.cfg.Channels[c].Thermo]
|
thermo := thermocouple.ThermoStringLookup[ad.cfg.Channels[c].Thermo]
|
||||||
|
|
||||||
_, err = statement.Exec(id, thermo, gain, offset, value, timestamp)
|
_, err = statement.ExecContext(saveContext, id, thermo, gain, offset, value, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user