diff --git a/event_sender.go b/event_sender.go index e69de29..8a46a17 100644 --- a/event_sender.go +++ b/event_sender.go @@ -0,0 +1,92 @@ +package event_sender + +import ( + "context" + "google.golang.org/grpc" + "loafle.com/overflow/agent_api/observer/messages" + pb "loafle.com/overflow/crawler_go/grpc" + q "loafle.com/overflow/queue_go" + "log" + "reflect" +) + +const ( + CENTRAL_ADDR = "192.168.1.105:50052" + SENDER_ID = "OVERFLOW_EVENT_SENDER" + DEFAULT_INTERVAL = 1 +) + +type Data struct { + AgentId string + SensorId string + Data map[string]string + StartedAt uint64 + FinishedAt uint64 +} + +type EventSender struct { + lq *q.LoafleQueue + sc chan interface{} +} + +func (es *EventSender) Start() { + es.sc = make(chan interface{}, 0) + + es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc) + + go es.checkQueue() +} + +func (es *EventSender) Stop() { + if es.sc != nil { + close(es.sc) + } +} + +func (es *EventSender) checkQueue() { + + result := make([]*Data, 0) + + for sc := range es.sc { + items := reflect.ValueOf(sc) + if items.Kind() != reflect.Slice { + log.Println("ddddd") + } + + for i := 0; i < items.Len(); i++ { + item := items.Index(i).Elem().Interface() + tempCollectedData := item.(q.Item) + collectedData := tempCollectedData.Value.(*Data) + collectedData.AgentId = agentIdentifier() + result = append(result, collectedData) + log.Println("Result Len: ", len(result)) + } + es.send(result) + result = nil + } + +} + +func (es *EventSender) send(data []*Data) { + log.Println("Send Started") + conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) + + if err != nil { + log.Fatal("Connection Error :", err.Error()) + return + } + defer conn.Close() + + client := pb.NewStatusClient(conn) + out, err := client.Status(context.Background(), &pb.Empty{}) + + if err != nil { + log.Fatal("client Error:", err.Error()) + return + } + log.Print(out) +} + +func agentIdentifier() string { + return "agentID_000000001" +}