138 lines
3.2 KiB
Go
138 lines
3.2 KiB
Go
package drone
|
|
|
|
import (
|
|
"bufio"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
influxdb "github.com/influxdata/influxdb1-client/v2"
|
|
)
|
|
|
|
// An uninteresting service.
|
|
type Service struct {
|
|
ch chan bool
|
|
influx influxdb.Client
|
|
waitGroup *sync.WaitGroup
|
|
}
|
|
|
|
// Make a new Service.
|
|
func NewService() *Service {
|
|
s := &Service{
|
|
ch: make(chan bool),
|
|
waitGroup: &sync.WaitGroup{},
|
|
}
|
|
s.waitGroup.Add(1)
|
|
return s
|
|
}
|
|
|
|
// Accept connections and spawn a goroutine to serve each one. Stop listening
|
|
// if anything is received on the service's channel.
|
|
func (s *Service) Serve(listener *net.TCPListener) {
|
|
defer s.waitGroup.Done()
|
|
|
|
influx, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
|
|
Addr: "http://localhost:8086",
|
|
Username: os.Getenv("INFLUX_USER"),
|
|
Password: os.Getenv("INFLUX_PWD"),
|
|
})
|
|
if err != nil {
|
|
panic(err) // error handling here; normally we wouldn't use fmt but it works for the example
|
|
}
|
|
|
|
s.influx = influx
|
|
|
|
for {
|
|
select {
|
|
case <-s.ch:
|
|
log.Println("stopping listening on", listener.Addr())
|
|
listener.Close()
|
|
return
|
|
default:
|
|
}
|
|
listener.SetDeadline(time.Now().Add(1e9))
|
|
conn, err := listener.AcceptTCP()
|
|
if nil != err {
|
|
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
|
|
continue
|
|
}
|
|
log.Println(err)
|
|
}
|
|
log.Println(conn.RemoteAddr(), "connected")
|
|
s.waitGroup.Add(1)
|
|
go s.serve(conn)
|
|
}
|
|
}
|
|
|
|
// Stop the service by closing the service's channel. Block until the service
|
|
// is really stopped.
|
|
func (s *Service) Stop() {
|
|
s.influx.Close()
|
|
close(s.ch)
|
|
s.waitGroup.Wait()
|
|
}
|
|
|
|
// Serve a connection by reading and writing what was read. That's right, this
|
|
// is an echo service. Stop reading and writing if anything is received on the
|
|
// service's channel but only after writing what was read.
|
|
func (s *Service) serve(conn *net.TCPConn) {
|
|
defer conn.Close()
|
|
defer s.waitGroup.Done()
|
|
for {
|
|
select {
|
|
case <-s.ch:
|
|
log.Println("disconnecting", conn.RemoteAddr())
|
|
return
|
|
default:
|
|
}
|
|
conn.SetDeadline(time.Now().Add(1e9))
|
|
//buf := make([]byte, 4096)
|
|
|
|
reader := bufio.NewReader(conn)
|
|
buf, err := reader.ReadBytes('#')
|
|
if nil != err {
|
|
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
|
|
continue
|
|
}
|
|
log.Println(err)
|
|
return
|
|
}
|
|
|
|
p, err := parseProtocol(buf)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
|
|
bp, _ := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
|
|
Database: "kiast",
|
|
Precision: "s",
|
|
})
|
|
// Create a point and add to batch
|
|
tags := map[string]string{}
|
|
fields := map[string]interface{}{
|
|
"time": p.Time,
|
|
"latitude": p.Latitude,
|
|
"longitude": p.Longitude,
|
|
"altitude": p.Altitude,
|
|
"speed": p.Speed,
|
|
"direction": p.Direction,
|
|
"fixed": p.Fixed,
|
|
"lte_status": p.LTEStatus,
|
|
"emm_error": p.EMMError,
|
|
"esm_error": p.ESMError,
|
|
"signal_strength": p.SignalStrength,
|
|
"satellite_count": p.SatelliteCount,
|
|
"satellite_signal_strength": p.SatelliteSignalStrength,
|
|
}
|
|
pt, err := influxdb.NewPoint("drone_gps", tags, fields, time.Now())
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
bp.AddPoint(pt)
|
|
|
|
s.influx.Write(bp)
|
|
}
|
|
}
|