This commit is contained in:
insanity@loafle.com 2017-05-16 18:03:29 +09:00
parent bdd1c4927f
commit 5694df0614
2 changed files with 20 additions and 27 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
cm "loafle.com/overflow/agent_api/config_manager" cm "loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/messages"
pb "loafle.com/overflow/crawler_go/grpc" pb "loafle.com/overflow/crawler_go/grpc"
q "loafle.com/overflow/queue_go" q "loafle.com/overflow/queue_go"
"log" "log"
@ -21,14 +22,6 @@ var (
once sync.Once once sync.Once
) )
type Data struct {
AgentId string
SensorId string
Data []byte
StartedAt uint64
FinishedAt uint64
}
type DataSender struct { type DataSender struct {
once sync.Once once sync.Once
queue *q.LoafleQueue queue *q.LoafleQueue
@ -36,7 +29,7 @@ type DataSender struct {
} }
func Start(ch chan bool, conf *cm.GlobalConfig) { func Start(ch chan bool, conf *cm.GlobalConfig) {
d := DataSender{} d := GetInstance()
d.start(conf) d.start(conf)
ch <- true ch <- true
} }
@ -54,12 +47,10 @@ func GetInstance() *DataSender {
} }
func (ds *DataSender) start(conf *cm.GlobalConfig) { func (ds *DataSender) start(conf *cm.GlobalConfig) {
ds.once.Do(func() {
qc := make(chan interface{}) qc := make(chan interface{})
ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc) ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc)
ds.gconf = conf ds.gconf = conf
go ds.handleData(qc) go ds.handleData(qc)
})
} }
func (ds *DataSender) stop() {} func (ds *DataSender) stop() {}
@ -68,10 +59,10 @@ func (ds *DataSender) handleData(qc chan interface{}) {
for { for {
select { select {
case items := <-qc: case items := <-qc:
result := make([]*Data, 0) result := make([]*messages.Data, 0)
for _, item := range items.([]*q.Item) { for _, item := range items.([]*q.Item) {
collectedData := item.Value.(*pb.Output) collectedData := item.Value.(*pb.Output)
d := &Data{} d := &messages.Data{}
d.Data = collectedData.Data d.Data = collectedData.Data
d.AgentId = agentIdentifier() d.AgentId = agentIdentifier()
result = append(result, d) result = append(result, d)
@ -82,14 +73,17 @@ func (ds *DataSender) handleData(qc chan interface{}) {
} }
func AddData(data interface{}) { func AddData(data interface{}) {
GetInstance().queue.PushItem(data) ds := GetInstance()
ds.queue.PushItem(data)
} }
func (ds *DataSender) send(data []*Data) { func (ds *DataSender) send(data []*messages.Data) {
for _, v := range data {
log.Printf("SEND SENSOR RESULT : %s - %s", v.SensorId, v.Data)
}
ds.addFailedData(data)
log.Println("TO CENTRAL: ", data)
//ds.addFailedData(data)
//addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port) //addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port)
//conn, err := grpc.Dial(addr, grpc.WithInsecure()) //conn, err := grpc.Dial(addr, grpc.WithInsecure())
//if err != nil { //if err != nil {
@ -105,19 +99,19 @@ func (ds *DataSender) send(data []*Data) {
// ds.saveFailedData(data) // ds.saveFailedData(data)
// return // return
//} //}
ds.removeFailed() //ds.removeFailed()
//log.Print(out) //log.Print(out)
} }
func (ds *DataSender) addFailedData(data []*Data) { func (ds *DataSender) addFailedData(data []*messages.Data) {
bytes := ds.getFailedData() bytes := ds.getFailedData()
if bytes != nil { if bytes != nil {
failed := Data{} failed := messages.Data{}
err := json.Unmarshal(bytes, &failed) err := json.Unmarshal(bytes, &failed)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
data = append([]*Data{&failed}, data...) //prepend data = append([]*messages.Data{&failed}, data...) //prepend
} }
} }
@ -137,7 +131,7 @@ func (ds *DataSender) removeFailed() {
} }
} }
func (ds *DataSender) saveFailedData(datas []*Data) { func (ds *DataSender) saveFailedData(datas []*messages.Data) {
file, err := tempFile() file, err := tempFile()
if err != nil { if err != nil {

View File

@ -2,7 +2,6 @@ package data_sender_go
import ( import (
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
"strconv" "strconv"
"testing" "testing"
"time" "time"