data_sender_go/data_sender.go

188 lines
3.5 KiB
Go
Raw Normal View History

2017-04-26 06:47:40 +00:00
package data_sender_go
import (
2017-04-26 09:07:44 +00:00
"encoding/json"
2017-04-26 11:00:27 +00:00
"io/ioutil"
2017-04-27 11:22:19 +00:00
cm "loafle.com/overflow/agent_api/config_manager"
2017-05-16 09:03:29 +00:00
"loafle.com/overflow/agent_api/messages"
2017-04-26 09:07:44 +00:00
pb "loafle.com/overflow/crawler_go/grpc"
q "loafle.com/overflow/queue_go"
2017-04-26 11:00:27 +00:00
"log"
2017-04-26 06:47:40 +00:00
"os"
2017-04-26 09:07:44 +00:00
"sync"
2017-04-26 06:47:40 +00:00
)
const (
2017-04-27 01:20:08 +00:00
FILE_PATH = "/overflow/tmp/data.tmp"
2017-04-27 11:22:19 +00:00
DEFAULT_INTERVAL = 10
)
var (
instance *DataSender
once sync.Once
2017-04-26 06:47:40 +00:00
)
2017-04-26 09:07:44 +00:00
type DataSender struct {
2017-04-27 11:22:19 +00:00
once sync.Once
queue *q.LoafleQueue
gconf *cm.GlobalConfig
2017-04-26 09:07:44 +00:00
}
2017-05-15 10:07:59 +00:00
func Start(ch chan bool, conf *cm.GlobalConfig) {
2017-05-16 09:03:29 +00:00
d := GetInstance()
2017-05-15 10:07:59 +00:00
d.start(conf)
ch <- true
}
func Stop(ch chan bool) {
GetInstance().stop()
ch <- true
2017-04-26 09:07:44 +00:00
}
2017-04-27 11:22:19 +00:00
func GetInstance() *DataSender {
once.Do(func() {
instance = &DataSender{}
})
return instance
2017-04-26 09:07:44 +00:00
}
2017-05-15 10:07:59 +00:00
func (ds *DataSender) start(conf *cm.GlobalConfig) {
2017-05-16 09:03:29 +00:00
qc := make(chan interface{})
ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc)
ds.gconf = conf
go ds.handleData(qc)
2017-04-26 09:07:44 +00:00
}
2017-05-11 09:58:19 +00:00
2017-05-15 10:07:59 +00:00
func (ds *DataSender) stop() {}
2017-04-26 11:00:27 +00:00
2017-04-27 11:22:19 +00:00
func (ds *DataSender) handleData(qc chan interface{}) {
for {
select {
case items := <-qc:
2017-05-16 09:03:29 +00:00
result := make([]*messages.Data, 0)
2017-04-27 11:22:19 +00:00
for _, item := range items.([]*q.Item) {
2017-05-15 10:07:59 +00:00
collectedData := item.Value.(*pb.Output)
2017-05-16 09:03:29 +00:00
d := &messages.Data{}
2017-05-15 10:07:59 +00:00
d.Data = collectedData.Data
d.AgentId = agentIdentifier()
result = append(result, d)
2017-04-27 11:22:19 +00:00
}
ds.send(result)
2017-04-26 11:00:27 +00:00
}
}
2017-04-27 11:22:19 +00:00
}
2017-05-15 10:07:59 +00:00
func AddData(data interface{}) {
2017-05-16 09:03:29 +00:00
ds := GetInstance()
ds.queue.PushItem(data)
2017-05-15 10:07:59 +00:00
}
2017-05-16 09:03:29 +00:00
func (ds *DataSender) send(data []*messages.Data) {
for _, v := range data {
log.Printf("SEND SENSOR RESULT : %s - %s", v.SensorId, v.Data)
}
2017-04-27 11:22:19 +00:00
2017-05-16 09:03:29 +00:00
//ds.addFailedData(data)
2017-05-15 10:07:59 +00:00
//addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port)
//conn, err := grpc.Dial(addr, grpc.WithInsecure())
//if err != nil {
// ds.saveFailedData(data)
// return
//}
//defer conn.Close()
2017-04-26 06:47:40 +00:00
//temporary
2017-05-15 10:07:59 +00:00
//client := pb.NewStatusClient(conn)
//out, err := client.Status(context.Background(), &pb.Empty{})
//if err != nil {
// ds.saveFailedData(data)
// return
//}
2017-05-16 09:03:29 +00:00
//ds.removeFailed()
2017-05-15 10:07:59 +00:00
//log.Print(out)
2017-04-26 06:47:40 +00:00
}
2017-05-16 09:03:29 +00:00
func (ds *DataSender) addFailedData(data []*messages.Data) {
2017-04-27 11:22:19 +00:00
bytes := ds.getFailedData()
if bytes != nil {
2017-05-16 09:03:29 +00:00
failed := messages.Data{}
2017-04-27 11:22:19 +00:00
err := json.Unmarshal(bytes, &failed)
if err != nil {
log.Println(err)
}
2017-05-16 09:03:29 +00:00
data = append([]*messages.Data{&failed}, data...) //prepend
2017-04-27 11:22:19 +00:00
}
}
2017-04-26 11:00:27 +00:00
func (ds *DataSender) getFailedData() []byte {
b, err := ioutil.ReadFile(FILE_PATH)
if err != nil {
log.Println(err)
return nil
}
return b
}
func (ds *DataSender) removeFailed() {
err := os.Remove(FILE_PATH)
if err != nil {
log.Println(err)
}
}
2017-05-16 09:03:29 +00:00
func (ds *DataSender) saveFailedData(datas []*messages.Data) {
2017-04-26 06:47:40 +00:00
file, err := tempFile()
if err != nil {
log.Println(err)
}
defer func() {
if err := file.Close(); err != nil {
log.Println(err)
return
}
}()
2017-04-26 09:07:44 +00:00
for _, data := range datas {
bytes, err := json.Marshal(&data)
if err != nil {
log.Println(err)
}
2017-04-26 09:48:01 +00:00
if bytes != nil {
2017-04-26 11:00:27 +00:00
log.Println("write : ", string(bytes))
2017-04-26 09:48:01 +00:00
_, err = file.Write(bytes)
if err != nil {
log.Println(err)
}
2017-04-26 09:07:44 +00:00
}
2017-04-26 06:47:40 +00:00
}
}
func tempFile() (*os.File, error) {
var file *os.File
var fileInfo os.FileInfo
var err error
if fileInfo, err = os.Stat(FILE_PATH); err != nil {
if os.IsNotExist(err) {
file, err = os.Create(FILE_PATH)
if err != nil {
return nil, err
}
}
2017-04-26 09:07:44 +00:00
} else {
2017-04-26 06:47:40 +00:00
if fileInfo != nil {
2017-04-26 09:07:44 +00:00
file, err = os.OpenFile(FILE_PATH, os.O_RDWR|os.O_APPEND, 0660)
2017-04-26 06:47:40 +00:00
if err != nil {
return nil, err
}
}
}
return file, nil
2017-04-26 09:07:44 +00:00
}
func agentIdentifier() string {
return "agentID_000000001"
}