2017-04-26 06:47:40 +00:00
|
|
|
package data_sender_go
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2017-04-26 09:07:44 +00:00
|
|
|
"encoding/json"
|
2017-04-26 11:00:27 +00:00
|
|
|
"google.golang.org/grpc"
|
|
|
|
"io/ioutil"
|
|
|
|
"loafle.com/overflow/agent_api/observer"
|
2017-04-26 09:07:44 +00:00
|
|
|
pb "loafle.com/overflow/crawler_go/grpc"
|
|
|
|
"loafle.com/overflow/cron_go"
|
|
|
|
q "loafle.com/overflow/queue_go"
|
2017-04-26 11:00:27 +00:00
|
|
|
"log"
|
2017-04-26 06:47:40 +00:00
|
|
|
"os"
|
2017-04-26 09:07:44 +00:00
|
|
|
"sync"
|
2017-04-26 06:47:40 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2017-04-26 09:07:44 +00:00
|
|
|
CENTRAL_ADDR = "127.0.0.1:50052"
|
2017-04-27 01:20:08 +00:00
|
|
|
FILE_PATH = "/overflow/tmp/data.tmp"
|
2017-04-26 09:07:44 +00:00
|
|
|
SENDER_ID = "OVERFLOW_DATA_SENDER"
|
|
|
|
DEFAULT_INTERVAL = uint64(5)
|
2017-04-26 06:47:40 +00:00
|
|
|
)
|
|
|
|
|
2017-04-26 09:07:44 +00:00
|
|
|
type Data struct {
|
|
|
|
AgentId string
|
|
|
|
SensorId string
|
|
|
|
Data map[string]string
|
|
|
|
StartedAt uint64
|
|
|
|
FinishedAt uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
type DataSender struct {
|
|
|
|
once sync.Once
|
|
|
|
runStat chan bool
|
|
|
|
queue *q.LoafleQueue
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ds *DataSender) Start() {
|
|
|
|
ds.once.Do(func() {
|
|
|
|
ds.init()
|
|
|
|
})
|
|
|
|
}
|
|
|
|
func (ds *DataSender) Stop() {
|
|
|
|
ds.runStat <- false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ds *DataSender) init() {
|
2017-04-26 11:00:27 +00:00
|
|
|
ds.queue = q.NewQueue(observer.DATA_QUEUE, 3)
|
2017-04-26 09:07:44 +00:00
|
|
|
|
|
|
|
cr := &cron.Cron{}
|
|
|
|
ds.runStat = cr.Start()
|
|
|
|
cr.AddTask(SENDER_ID, DEFAULT_INTERVAL).Invoke(ds.check)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ds *DataSender) check() {
|
|
|
|
items := ds.queue.GetItems()
|
|
|
|
len := len(items)
|
|
|
|
if len <= 0 {
|
|
|
|
return
|
|
|
|
}
|
2017-04-26 11:00:27 +00:00
|
|
|
result := make([]*Data, 0)
|
2017-04-26 09:07:44 +00:00
|
|
|
for _, item := range items {
|
|
|
|
collectedData := item.Value.(*Data)
|
|
|
|
collectedData.AgentId = agentIdentifier()
|
|
|
|
result = append(result, collectedData)
|
|
|
|
}
|
|
|
|
|
|
|
|
ds.send(result)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ds *DataSender) send(data []*Data) {
|
2017-04-26 11:00:27 +00:00
|
|
|
|
|
|
|
bytes := ds.getFailedData()
|
|
|
|
if bytes != nil {
|
|
|
|
failed := Data{}
|
|
|
|
err := json.Unmarshal(bytes, &failed)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
data = append([]*Data{&failed}, data...) //prepend
|
|
|
|
ds.removeFailed()
|
|
|
|
}
|
|
|
|
|
2017-04-26 06:47:40 +00:00
|
|
|
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
|
|
|
|
if err != nil {
|
2017-04-26 11:00:27 +00:00
|
|
|
ds.saveFailedData(data)
|
2017-04-26 06:47:40 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
//temporary
|
|
|
|
client := pb.NewStatusClient(conn)
|
|
|
|
out, err := client.Status(context.Background(), &pb.Empty{})
|
|
|
|
if err != nil {
|
2017-04-26 11:00:27 +00:00
|
|
|
ds.saveFailedData(data)
|
2017-04-26 06:47:40 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
log.Print(out)
|
|
|
|
}
|
|
|
|
|
2017-04-26 11:00:27 +00:00
|
|
|
func (ds *DataSender) getFailedData() []byte {
|
|
|
|
b, err := ioutil.ReadFile(FILE_PATH)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ds *DataSender) removeFailed() {
|
|
|
|
err := os.Remove(FILE_PATH)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ds *DataSender) saveFailedData(datas []*Data) {
|
2017-04-26 06:47:40 +00:00
|
|
|
|
|
|
|
file, err := tempFile()
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if err := file.Close(); err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2017-04-26 09:07:44 +00:00
|
|
|
for _, data := range datas {
|
|
|
|
bytes, err := json.Marshal(&data)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
2017-04-26 09:48:01 +00:00
|
|
|
if bytes != nil {
|
2017-04-26 11:00:27 +00:00
|
|
|
log.Println("write : ", string(bytes))
|
2017-04-26 09:48:01 +00:00
|
|
|
_, err = file.Write(bytes)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
2017-04-26 09:07:44 +00:00
|
|
|
}
|
2017-04-26 06:47:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func tempFile() (*os.File, error) {
|
|
|
|
|
|
|
|
var file *os.File
|
|
|
|
var fileInfo os.FileInfo
|
|
|
|
var err error
|
|
|
|
if fileInfo, err = os.Stat(FILE_PATH); err != nil {
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
file, err = os.Create(FILE_PATH)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
2017-04-26 09:07:44 +00:00
|
|
|
} else {
|
2017-04-26 06:47:40 +00:00
|
|
|
if fileInfo != nil {
|
2017-04-26 09:07:44 +00:00
|
|
|
file, err = os.OpenFile(FILE_PATH, os.O_RDWR|os.O_APPEND, 0660)
|
2017-04-26 06:47:40 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return file, nil
|
2017-04-26 09:07:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func agentIdentifier() string {
|
|
|
|
return "agentID_000000001"
|
|
|
|
}
|