From 407e1e5f70f0693d1060158660900fbd55973054 Mon Sep 17 00:00:00 2001 From: geek Date: Thu, 27 Apr 2017 18:58:50 +0900 Subject: [PATCH] event sender shared --- event_sender.go | 92 ++++++++++++++++++++++++++++++++++++++++++++ event_sender_test.go | 36 +++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 event_sender.go create mode 100644 event_sender_test.go diff --git a/event_sender.go b/event_sender.go new file mode 100644 index 0000000..8a46a17 --- /dev/null +++ b/event_sender.go @@ -0,0 +1,92 @@ +package event_sender + +import ( + "context" + "google.golang.org/grpc" + "loafle.com/overflow/agent_api/observer/messages" + pb "loafle.com/overflow/crawler_go/grpc" + q "loafle.com/overflow/queue_go" + "log" + "reflect" +) + +const ( + CENTRAL_ADDR = "192.168.1.105:50052" + SENDER_ID = "OVERFLOW_EVENT_SENDER" + DEFAULT_INTERVAL = 1 +) + +type Data struct { + AgentId string + SensorId string + Data map[string]string + StartedAt uint64 + FinishedAt uint64 +} + +type EventSender struct { + lq *q.LoafleQueue + sc chan interface{} +} + +func (es *EventSender) Start() { + es.sc = make(chan interface{}, 0) + + es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc) + + go es.checkQueue() +} + +func (es *EventSender) Stop() { + if es.sc != nil { + close(es.sc) + } +} + +func (es *EventSender) checkQueue() { + + result := make([]*Data, 0) + + for sc := range es.sc { + items := reflect.ValueOf(sc) + if items.Kind() != reflect.Slice { + log.Println("ddddd") + } + + for i := 0; i < items.Len(); i++ { + item := items.Index(i).Elem().Interface() + tempCollectedData := item.(q.Item) + collectedData := tempCollectedData.Value.(*Data) + collectedData.AgentId = agentIdentifier() + result = append(result, collectedData) + log.Println("Result Len: ", len(result)) + } + es.send(result) + result = nil + } + +} + +func (es *EventSender) send(data []*Data) { + log.Println("Send Started") + conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) + + if err != nil { + log.Fatal("Connection Error :", err.Error()) + return + } + defer conn.Close() + + client := pb.NewStatusClient(conn) + out, err := client.Status(context.Background(), &pb.Empty{}) + + if err != nil { + log.Fatal("client Error:", err.Error()) + return + } + log.Print(out) +} + +func agentIdentifier() string { + return "agentID_000000001" +} diff --git a/event_sender_test.go b/event_sender_test.go new file mode 100644 index 0000000..6183bc6 --- /dev/null +++ b/event_sender_test.go @@ -0,0 +1,36 @@ +package event_sender + +import ( + "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" + "log" + "testing" + "time" +) + +func TestEventSender_Start(t *testing.T) { + + es := &EventSender{} + es.Start() + testNotify() + + time.Sleep(time.Second * 100) + +} + +func testNotify() { + //time.Sleep(time.Second * 5) + + result := make(map[string]string) + result["ab"] = "123" + result["cd"] = "456" + result["ef"] = "789" + + cd := &Data{ + SensorId: "insanity", + Data: result, + } + + log.Println("New data Notify") + observer.Notify(messages.QUEUE_EVENT, cd) +}