diff --git a/data_sender.go b/data_sender.go index b353fdc..a733684 100644 --- a/data_sender.go +++ b/data_sender.go @@ -4,6 +4,7 @@ import ( "encoding/json" "io/ioutil" cm "loafle.com/overflow/agent_api/config_manager" + "loafle.com/overflow/agent_api/messages" pb "loafle.com/overflow/crawler_go/grpc" q "loafle.com/overflow/queue_go" "log" @@ -21,14 +22,6 @@ var ( once sync.Once ) -type Data struct { - AgentId string - SensorId string - Data []byte - StartedAt uint64 - FinishedAt uint64 -} - type DataSender struct { once sync.Once queue *q.LoafleQueue @@ -36,7 +29,7 @@ type DataSender struct { } func Start(ch chan bool, conf *cm.GlobalConfig) { - d := DataSender{} + d := GetInstance() d.start(conf) ch <- true } @@ -54,12 +47,10 @@ func GetInstance() *DataSender { } func (ds *DataSender) start(conf *cm.GlobalConfig) { - ds.once.Do(func() { - qc := make(chan interface{}) - ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc) - ds.gconf = conf - go ds.handleData(qc) - }) + qc := make(chan interface{}) + ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc) + ds.gconf = conf + go ds.handleData(qc) } func (ds *DataSender) stop() {} @@ -68,10 +59,10 @@ func (ds *DataSender) handleData(qc chan interface{}) { for { select { case items := <-qc: - result := make([]*Data, 0) + result := make([]*messages.Data, 0) for _, item := range items.([]*q.Item) { collectedData := item.Value.(*pb.Output) - d := &Data{} + d := &messages.Data{} d.Data = collectedData.Data d.AgentId = agentIdentifier() result = append(result, d) @@ -82,14 +73,17 @@ func (ds *DataSender) handleData(qc chan interface{}) { } func AddData(data interface{}) { - GetInstance().queue.PushItem(data) + ds := GetInstance() + ds.queue.PushItem(data) } -func (ds *DataSender) send(data []*Data) { +func (ds *DataSender) send(data []*messages.Data) { + for _, v := range data { + log.Printf("SEND SENSOR RESULT : %s - %s", v.SensorId, v.Data) + } - ds.addFailedData(data) - log.Println("TO CENTRAL: ", data) + //ds.addFailedData(data) //addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port) //conn, err := grpc.Dial(addr, grpc.WithInsecure()) //if err != nil { @@ -105,19 +99,19 @@ func (ds *DataSender) send(data []*Data) { // ds.saveFailedData(data) // return //} - ds.removeFailed() + //ds.removeFailed() //log.Print(out) } -func (ds *DataSender) addFailedData(data []*Data) { +func (ds *DataSender) addFailedData(data []*messages.Data) { bytes := ds.getFailedData() if bytes != nil { - failed := Data{} + failed := messages.Data{} err := json.Unmarshal(bytes, &failed) if err != nil { log.Println(err) } - data = append([]*Data{&failed}, data...) //prepend + data = append([]*messages.Data{&failed}, data...) //prepend } } @@ -137,7 +131,7 @@ func (ds *DataSender) removeFailed() { } } -func (ds *DataSender) saveFailedData(datas []*Data) { +func (ds *DataSender) saveFailedData(datas []*messages.Data) { file, err := tempFile() if err != nil { diff --git a/data_sender_test.go b/data_sender_test.go index 70eafc1..9cf3ac9 100644 --- a/data_sender_test.go +++ b/data_sender_test.go @@ -2,7 +2,6 @@ package data_sender_go import ( "loafle.com/overflow/agent_api/observer" - "loafle.com/overflow/agent_api/observer/messages" "strconv" "testing" "time"