data_sender_go/data_sender.go

145 lines
2.5 KiB
Go
Raw Normal View History

2017-04-26 06:47:40 +00:00
package data_sender_go
import (
"google.golang.org/grpc"
"log"
"context"
2017-04-26 09:07:44 +00:00
"encoding/json"
2017-04-26 09:48:01 +00:00
"fmt"
2017-04-26 09:07:44 +00:00
pb "loafle.com/overflow/crawler_go/grpc"
"loafle.com/overflow/cron_go"
q "loafle.com/overflow/queue_go"
2017-04-26 06:47:40 +00:00
"os"
2017-04-26 09:07:44 +00:00
"sync"
2017-04-26 06:47:40 +00:00
)
const (
2017-04-26 09:07:44 +00:00
CENTRAL_ADDR = "127.0.0.1:50052"
FILE_PATH = "/home/insanity/data/temp"
SENDER_ID = "OVERFLOW_DATA_SENDER"
DEFAULT_INTERVAL = uint64(5)
2017-04-26 06:47:40 +00:00
)
2017-04-26 09:07:44 +00:00
type Data struct {
AgentId string
SensorId string
Data map[string]string
StartedAt uint64
FinishedAt uint64
}
type DataSender struct {
once sync.Once
runStat chan bool
queue *q.LoafleQueue
}
func (ds *DataSender) Start() {
ds.once.Do(func() {
ds.init()
})
}
func (ds *DataSender) Stop() {
ds.runStat <- false
}
func (ds *DataSender) init() {
ds.queue = q.NewQueue(q.EVENT_TYPE, 3)
cr := &cron.Cron{}
ds.runStat = cr.Start()
cr.AddTask(SENDER_ID, DEFAULT_INTERVAL).Invoke(ds.check)
}
func (ds *DataSender) check() {
items := ds.queue.GetItems()
len := len(items)
if len <= 0 {
return
}
result := make([]*Data, len)
for _, item := range items {
collectedData := item.Value.(*Data)
collectedData.AgentId = agentIdentifier()
result = append(result, collectedData)
}
ds.send(result)
}
func (ds *DataSender) send(data []*Data) {
2017-04-26 06:47:40 +00:00
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
if err != nil {
saveFailedData(data)
return
}
defer conn.Close()
//temporary
client := pb.NewStatusClient(conn)
out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil {
saveFailedData(data)
return
}
log.Print(out)
}
2017-04-26 09:07:44 +00:00
func saveFailedData(datas []*Data) {
2017-04-26 06:47:40 +00:00
file, err := tempFile()
if err != nil {
log.Println(err)
}
defer func() {
if err := file.Close(); err != nil {
log.Println(err)
return
}
}()
2017-04-26 09:07:44 +00:00
for _, data := range datas {
bytes, err := json.Marshal(&data)
if err != nil {
log.Println(err)
}
2017-04-26 09:48:01 +00:00
if bytes != nil {
fmt.Println("write : ", string(bytes))
_, err = file.Write(bytes)
if err != nil {
log.Println(err)
}
2017-04-26 09:07:44 +00:00
}
2017-04-26 06:47:40 +00:00
}
}
func tempFile() (*os.File, error) {
var file *os.File
var fileInfo os.FileInfo
var err error
if fileInfo, err = os.Stat(FILE_PATH); err != nil {
if os.IsNotExist(err) {
file, err = os.Create(FILE_PATH)
if err != nil {
return nil, err
}
}
2017-04-26 09:07:44 +00:00
} else {
2017-04-26 06:47:40 +00:00
if fileInfo != nil {
2017-04-26 09:07:44 +00:00
file, err = os.OpenFile(FILE_PATH, os.O_RDWR|os.O_APPEND, 0660)
2017-04-26 06:47:40 +00:00
if err != nil {
return nil, err
}
}
}
return file, nil
2017-04-26 09:07:44 +00:00
}
func agentIdentifier() string {
return "agentID_000000001"
}