package event_sender_go import ( "context" "google.golang.org/grpc" //"loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" q "loafle.com/overflow/queue_go" "log" "reflect" "sync" ) var ( instance *EventSender once sync.Once ) const ( CENTRAL_ADDR = "192.168.1.105:50052" SENDER_ID = "OVERFLOW_EVENT_SENDER" DEFAULT_INTERVAL = 1 ) type Data struct { AgentId string SensorId string Data map[string]string StartedAt uint64 FinishedAt uint64 } func GetInstance() *EventSender { once.Do(func() { instance = &EventSender{} }) return instance } //func init() { // ch := make(chan interface{}, 0) // ch1 := make(chan interface{}, 0) // observer.Add(messages.CFG_LOADED, ch) // observer.Add(messages.SCF_STOPPED, ch1) // handleInit(ch, ch1) //} // //func handleInit(ch chan interface{}, ch1 chan interface{}) { // es := GetInstance() // go func() { // data := <-ch // log.Println("handleInit", data) // //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() // es.start() // observer.Remove(messages.CFG_LOADED, ch) // }() // // go func() { // data := <-ch1 // observer.Notify(messages.QUEUE_EVENT, data) // time.Sleep(2 * time.Second) // es.Stop() // }() //} func Start(ch chan bool) (err error) { es := GetInstance() es.start() ch <- true return nil } func Stop() (err error) { es := GetInstance() es.stop() return nil } func AddEventData(event interface{}) { es := GetInstance() es.lq.PushItem(event) } type EventSender struct { once sync.Once lq *q.LoafleQueue sc chan interface{} } func (es *EventSender) start() { es.sc = make(chan interface{}, 0) es.lq = q.NewQueue(DEFAULT_INTERVAL, es.sc) go es.checkQueue() } func (es *EventSender) stop() { if es.sc != nil { es.lq.Close() close(es.sc) es.sc = nil } } func (es *EventSender) checkQueue() { result := make([]*Data, 0) for sc := range es.sc { items := reflect.ValueOf(sc) if items.Kind() != reflect.Slice { log.Println("ddddd") } for i := 0; i < items.Len(); i++ { item := items.Index(i).Elem().Interface() tempCollectedData := item.(q.Item) collectedData := tempCollectedData.Value.(*Data) collectedData.AgentId = agentIdentifier() result = append(result, collectedData) //log.Println("Result Len: ", len(result)) } es.send(result) result = nil } } func (es *EventSender) send(data []*Data) { log.Println("Send Started") conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) if err != nil { log.Fatal("Connection Error :", err.Error()) return } defer conn.Close() client := pb.NewStatusClient(conn) out, err := client.Status(context.Background(), &pb.Empty{}) if err != nil { log.Fatal("client Error:", err.Error()) return } log.Print(out) } func agentIdentifier() string { return "agentID_000000001" }