package event_sender import ( "context" "google.golang.org/grpc" "loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" q "loafle.com/overflow/queue_go" "log" "reflect" "sync" "time" ) var ( instance *EventSender once sync.Once ) const ( CENTRAL_ADDR = "192.168.1.105:50052" SENDER_ID = "OVERFLOW_EVENT_SENDER" DEFAULT_INTERVAL = 1 ) type Data struct { AgentId string SensorId string Data map[string]string StartedAt uint64 FinishedAt uint64 } func GetInstance() *EventSender { once.Do(func() { instance = &EventSender{} }) return instance } func init() { ch := make(chan interface{}, 0) ch1 := make(chan interface{}, 0) observer.Add(messages.CFG_LOADED, ch) observer.Add(messages.SCF_STOPPED,ch1) handleInit(ch, ch1) } func handleInit(ch chan interface{}, ch1 chan interface{}) { es := GetInstance() go func() { data := <-ch log.Println("handleInit", data) //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() es.start() observer.Remove(messages.CFG_LOADED, ch) }() go func() { data := <- ch1 observer.Notify(messages.QUEUE_DATA, data) time.Sleep(2*time.Second) es.Stop() }() } type EventSender struct { once sync.Once lq *q.LoafleQueue sc chan interface{} } func (es *EventSender) start() { es.sc = make(chan interface{}, 0) es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc) go es.checkQueue() } func (es *EventSender) Stop() { if es.sc != nil { close(es.sc) es.sc = nil } } func (es *EventSender) checkQueue() { result := make([]*Data, 0) for sc := range es.sc { items := reflect.ValueOf(sc) if items.Kind() != reflect.Slice { log.Println("ddddd") } for i := 0; i < items.Len(); i++ { item := items.Index(i).Elem().Interface() tempCollectedData := item.(q.Item) collectedData := tempCollectedData.Value.(*Data) collectedData.AgentId = agentIdentifier() result = append(result, collectedData) //log.Println("Result Len: ", len(result)) } es.send(result) result = nil } } func (es *EventSender) send(data []*Data) { log.Println("Send Started") conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) if err != nil { log.Fatal("Connection Error :", err.Error()) return } defer conn.Close() client := pb.NewStatusClient(conn) out, err := client.Status(context.Background(), &pb.Empty{}) if err != nil { log.Fatal("client Error:", err.Error()) return } log.Print(out) } func agentIdentifier() string { return "agentID_000000001" }