diff --git a/event_sender.go b/event_sender.go index 8a46a17..e69de29 100644 --- a/event_sender.go +++ b/event_sender.go @@ -1,92 +0,0 @@ -package event_sender - -import ( - "context" - "google.golang.org/grpc" - "loafle.com/overflow/agent_api/observer/messages" - pb "loafle.com/overflow/crawler_go/grpc" - q "loafle.com/overflow/queue_go" - "log" - "reflect" -) - -const ( - CENTRAL_ADDR = "192.168.1.105:50052" - SENDER_ID = "OVERFLOW_EVENT_SENDER" - DEFAULT_INTERVAL = 1 -) - -type Data struct { - AgentId string - SensorId string - Data map[string]string - StartedAt uint64 - FinishedAt uint64 -} - -type EventSender struct { - lq *q.LoafleQueue - sc chan interface{} -} - -func (es *EventSender) Start() { - es.sc = make(chan interface{}, 0) - - es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc) - - go es.checkQueue() -} - -func (es *EventSender) Stop() { - if es.sc != nil { - close(es.sc) - } -} - -func (es *EventSender) checkQueue() { - - result := make([]*Data, 0) - - for sc := range es.sc { - items := reflect.ValueOf(sc) - if items.Kind() != reflect.Slice { - log.Println("ddddd") - } - - for i := 0; i < items.Len(); i++ { - item := items.Index(i).Elem().Interface() - tempCollectedData := item.(q.Item) - collectedData := tempCollectedData.Value.(*Data) - collectedData.AgentId = agentIdentifier() - result = append(result, collectedData) - log.Println("Result Len: ", len(result)) - } - es.send(result) - result = nil - } - -} - -func (es *EventSender) send(data []*Data) { - log.Println("Send Started") - conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) - - if err != nil { - log.Fatal("Connection Error :", err.Error()) - return - } - defer conn.Close() - - client := pb.NewStatusClient(conn) - out, err := client.Status(context.Background(), &pb.Empty{}) - - if err != nil { - log.Fatal("client Error:", err.Error()) - return - } - log.Print(out) -} - -func agentIdentifier() string { - return "agentID_000000001" -}