kiast-drone-collector/pkg/drone/service.go
2019-08-09 14:06:04 +09:00

138 lines
3.2 KiB
Go

package drone
import (
"bufio"
"log"
"net"
"os"
"sync"
"time"
influxdb "github.com/influxdata/influxdb1-client/v2"
)
// An uninteresting service.
type Service struct {
ch chan bool
influx influxdb.Client
waitGroup *sync.WaitGroup
}
// Make a new Service.
func NewService() *Service {
s := &Service{
ch: make(chan bool),
waitGroup: &sync.WaitGroup{},
}
s.waitGroup.Add(1)
return s
}
// Accept connections and spawn a goroutine to serve each one. Stop listening
// if anything is received on the service's channel.
func (s *Service) Serve(listener *net.TCPListener) {
defer s.waitGroup.Done()
influx, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: "http://localhost:8086",
Username: os.Getenv("INFLUX_USER"),
Password: os.Getenv("INFLUX_PWD"),
})
if err != nil {
panic(err) // error handling here; normally we wouldn't use fmt but it works for the example
}
s.influx = influx
for {
select {
case <-s.ch:
log.Println("stopping listening on", listener.Addr())
listener.Close()
return
default:
}
listener.SetDeadline(time.Now().Add(1e9))
conn, err := listener.AcceptTCP()
if nil != err {
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
log.Println(err)
}
log.Println(conn.RemoteAddr(), "connected")
s.waitGroup.Add(1)
go s.serve(conn)
}
}
// Stop the service by closing the service's channel. Block until the service
// is really stopped.
func (s *Service) Stop() {
s.influx.Close()
close(s.ch)
s.waitGroup.Wait()
}
// Serve a connection by reading and writing what was read. That's right, this
// is an echo service. Stop reading and writing if anything is received on the
// service's channel but only after writing what was read.
func (s *Service) serve(conn *net.TCPConn) {
defer conn.Close()
defer s.waitGroup.Done()
for {
select {
case <-s.ch:
log.Println("disconnecting", conn.RemoteAddr())
return
default:
}
conn.SetDeadline(time.Now().Add(1e9))
//buf := make([]byte, 4096)
reader := bufio.NewReader(conn)
buf, err := reader.ReadBytes('#')
if nil != err {
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
log.Println(err)
return
}
p, err := parseProtocol(buf)
if err != nil {
log.Println(err)
}
bp, _ := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: "kiast",
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{}
fields := map[string]interface{}{
"time": p.Time,
"latitude": p.Latitude,
"longitude": p.Longitude,
"altitude": p.Altitude,
"speed": p.Speed,
"direction": p.Direction,
"fixed": p.Fixed,
"lte_status": p.LTEStatus,
"emm_error": p.EMMError,
"esm_error": p.ESMError,
"signal_strength": p.SignalStrength,
"satellite_count": p.SatelliteCount,
"satellite_signal_strength": p.SatelliteSignalStrength,
}
pt, err := influxdb.NewPoint("drone_gps", tags, fields, time.Now())
if err != nil {
log.Println(err)
}
bp.AddPoint(pt)
s.influx.Write(bp)
}
}