From 1370a0c821ee39cb1b635333c008dc8269124cee Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Mon, 15 May 2017 19:07:59 +0900 Subject: [PATCH] data_sender --- data_sender.go | 78 +++++++++++++++++++++----------------------- data_sender_event.go | 20 ------------ 2 files changed, 38 insertions(+), 60 deletions(-) delete mode 100644 data_sender_event.go diff --git a/data_sender.go b/data_sender.go index afabb45..41af16b 100644 --- a/data_sender.go +++ b/data_sender.go @@ -1,12 +1,9 @@ package data_sender_go import ( - "context" "encoding/json" - "google.golang.org/grpc" "io/ioutil" cm "loafle.com/overflow/agent_api/config_manager" - "loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" q "loafle.com/overflow/queue_go" "log" @@ -27,7 +24,7 @@ var ( type Data struct { AgentId string SensorId string - Data map[string]string + Data []byte StartedAt uint64 FinishedAt uint64 } @@ -38,8 +35,15 @@ type DataSender struct { gconf *cm.GlobalConfig } -func init() { - go handleConfigLoaded() +func Start(ch chan bool, conf *cm.GlobalConfig) { + d := DataSender{} + d.start(conf) + ch <- true +} + +func Stop(ch chan bool) { + GetInstance().stop() + ch <- true } func GetInstance() *DataSender { @@ -49,30 +53,17 @@ func GetInstance() *DataSender { return instance } -func startDataSender(ch chan interface{}) { - ds := GetInstance() - go func() { - data := <-ch - log.Println(data) - ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() - ds.start() - ds.addObservers() - }() -} - -func (ds *DataSender) start() { +func (ds *DataSender) start(conf *cm.GlobalConfig) { ds.once.Do(func() { qc := make(chan interface{}) - ds.queue = q.NewQueue(messages.QUEUE_DATA, DEFAULT_INTERVAL, qc) + ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc) + ds.gconf = conf go ds.handleData(qc) }) } -func (ds *DataSender) addObservers() { - go ds.handleAgentStop() -} +func (ds *DataSender) stop() {} -func (ds *DataSender) Stop() {} func (ds *DataSender) handleData(qc chan interface{}) { for { @@ -80,36 +71,43 @@ func (ds *DataSender) handleData(qc chan interface{}) { case items := <-qc: result := make([]*Data, 0) for _, item := range items.([]*q.Item) { - collectedData := item.Value.(*Data) - collectedData.AgentId = agentIdentifier() - result = append(result, collectedData) + collectedData := item.Value.(*pb.Output) + d := &Data{} + d.Data = collectedData.Data + d.AgentId = agentIdentifier() + result = append(result, d) } ds.send(result) } } } +func AddData(data interface{}) { + GetInstance().queue.PushItem(data) +} + func (ds *DataSender) send(data []*Data) { ds.addFailedData(data) + log.Println("TO CENTRAL: ", data) - addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port) - conn, err := grpc.Dial(addr, grpc.WithInsecure()) - if err != nil { - ds.saveFailedData(data) - return - } - defer conn.Close() + //addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port) + //conn, err := grpc.Dial(addr, grpc.WithInsecure()) + //if err != nil { + // ds.saveFailedData(data) + // return + //} + //defer conn.Close() //temporary - client := pb.NewStatusClient(conn) - out, err := client.Status(context.Background(), &pb.Empty{}) - if err != nil { - ds.saveFailedData(data) - return - } + //client := pb.NewStatusClient(conn) + //out, err := client.Status(context.Background(), &pb.Empty{}) + //if err != nil { + // ds.saveFailedData(data) + // return + //} ds.removeFailed() - log.Print(out) + //log.Print(out) } func (ds *DataSender) addFailedData(data []*Data) { diff --git a/data_sender_event.go b/data_sender_event.go deleted file mode 100644 index bef445e..0000000 --- a/data_sender_event.go +++ /dev/null @@ -1,20 +0,0 @@ -package data_sender_go - -import ( - "loafle.com/overflow/agent_api/observer" - "loafle.com/overflow/agent_api/observer/messages" -) - -func handleConfigLoaded() { - ch := make(chan interface{}, 0) - observer.Add(messages.CFG_LOADED, ch) - startDataSender(ch) - //observer.Remove(messages.CFG_LOADED, ch) -} - -func (ds *DataSender) handleAgentStop() { - ch := make(chan interface{}, 0) - observer.Add(messages.AGT_STOPPED, ch) - ds.Stop() - //observer.Remove(messages.AGT_STOPPED, ch) -}