From 26f82a78b0f07ad7fa447fd0b628deed459a7874 Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Thu, 27 Apr 2017 20:22:19 +0900 Subject: [PATCH] data_sender --- data_sender.go | 111 ++++++++++++++++++++++++++++---------------- data_sender_test.go | 51 +++++++++++++++----- 2 files changed, 110 insertions(+), 52 deletions(-) diff --git a/data_sender.go b/data_sender.go index 6c0d828..ee4cf2e 100644 --- a/data_sender.go +++ b/data_sender.go @@ -5,9 +5,10 @@ import ( "encoding/json" "google.golang.org/grpc" "io/ioutil" + cm "loafle.com/overflow/agent_api/config_manager" "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" - "loafle.com/overflow/cron_go" q "loafle.com/overflow/queue_go" "log" "os" @@ -17,8 +18,12 @@ import ( const ( CENTRAL_ADDR = "127.0.0.1:50052" FILE_PATH = "/overflow/tmp/data.tmp" - SENDER_ID = "OVERFLOW_DATA_SENDER" - DEFAULT_INTERVAL = uint64(5) + DEFAULT_INTERVAL = 10 +) + +var ( + instance *DataSender + once sync.Once ) type Data struct { @@ -30,56 +35,69 @@ type Data struct { } type DataSender struct { - once sync.Once - runStat chan bool - queue *q.LoafleQueue + once sync.Once + queue *q.LoafleQueue + gconf *cm.GlobalConfig } -func (ds *DataSender) Start() { +func init() { + addObservers() +} + +func GetInstance() *DataSender { + once.Do(func() { + instance = &DataSender{} + }) + return instance +} + +func addObservers() { + ch := make(chan interface{}, 0) + observer.Add(messages.CONFIGMANAGER_LOADED, ch) + handleInit(ch) +} + +func handleInit(ch chan interface{}) { + ds := GetInstance() + go func() { + data := <-ch + log.Println(data) + //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() + ds.start() + close(ch) + }() +} + +func (ds *DataSender) start() { ds.once.Do(func() { - ds.init() + qc := make(chan interface{}) + ds.queue = q.NewQueue(messages.QUEUE_DATA, DEFAULT_INTERVAL, qc) + + go ds.handleData(qc) }) } func (ds *DataSender) Stop() { - ds.runStat <- false + } -func (ds *DataSender) init() { - ds.queue = q.NewQueue(observer.DATA_QUEUE, 3) - - cr := &cron.Cron{} - ds.runStat = cr.Start() - cr.AddTask(SENDER_ID, DEFAULT_INTERVAL).Invoke(ds.check) -} - -func (ds *DataSender) check() { - items := ds.queue.GetItems() - len := len(items) - if len <= 0 { - return +func (ds *DataSender) handleData(qc chan interface{}) { + for { + select { + case items := <-qc: + result := make([]*Data, 0) + for _, item := range items.([]*q.Item) { + collectedData := item.Value.(*Data) + collectedData.AgentId = agentIdentifier() + result = append(result, collectedData) + } + ds.send(result) + } } - result := make([]*Data, 0) - for _, item := range items { - collectedData := item.Value.(*Data) - collectedData.AgentId = agentIdentifier() - result = append(result, collectedData) - } - - ds.send(result) } func (ds *DataSender) send(data []*Data) { - bytes := ds.getFailedData() - if bytes != nil { - failed := Data{} - err := json.Unmarshal(bytes, &failed) - if err != nil { - log.Println(err) - } - data = append([]*Data{&failed}, data...) //prepend - ds.removeFailed() - } + ds.addFailedData(data) conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) if err != nil { @@ -95,9 +113,22 @@ func (ds *DataSender) send(data []*Data) { ds.saveFailedData(data) return } + ds.removeFailed() log.Print(out) } +func (ds *DataSender) addFailedData(data []*Data) { + bytes := ds.getFailedData() + if bytes != nil { + failed := Data{} + err := json.Unmarshal(bytes, &failed) + if err != nil { + log.Println(err) + } + data = append([]*Data{&failed}, data...) //prepend + } +} + func (ds *DataSender) getFailedData() []byte { b, err := ioutil.ReadFile(FILE_PATH) if err != nil { diff --git a/data_sender_test.go b/data_sender_test.go index ad8ddf1..70eafc1 100644 --- a/data_sender_test.go +++ b/data_sender_test.go @@ -1,8 +1,9 @@ package data_sender_go import ( - "fmt" "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" + "strconv" "testing" "time" ) @@ -11,28 +12,54 @@ type Result struct { Data []byte } -func TestSend(t *testing.T) { +func TestTotal(t *testing.T) { - ds := &DataSender{} - ds.Start() - testNotify() + time.Sleep(time.Second * 5) + + observer.Notify("CONFIGMANAGER_LOADED", nil) + + time.Sleep(time.Second * 5) + + for i := 0; i < 20; i++ { + testNotify(strconv.Itoa(i)) + } + + time.Sleep(time.Second * 12) + for i := 20; i < 30; i++ { + testNotify(strconv.Itoa(i)) + } time.Sleep(time.Second * 100) } -func testNotify() { - time.Sleep(time.Second * 5) +func TestSend(t *testing.T) { + + ds := &DataSender{} + ds.start() + + for i := 0; i < 20; i++ { + testNotify(strconv.Itoa(i)) + } + + time.Sleep(time.Second * 12) + for i := 20; i < 30; i++ { + testNotify(strconv.Itoa(i)) + } + + time.Sleep(time.Second * 100) +} + +func testNotify(val string) { result := make(map[string]string) - result["ab"] = "123" - result["cd"] = "456" - result["ef"] = "789" + result["a"] = val + result["b"] = val + result["c"] = val cd := &Data{ SensorId: "insanity", Data: result, } - fmt.Println("New data Notify") - observer.Notify(observer.DATA_QUEUE.String(), cd) + observer.Notify(messages.QUEUE_DATA, cd) }