diff --git a/event_sender.go b/event_sender.go index 01dcc5a..b88eb41 100644 --- a/event_sender.go +++ b/event_sender.go @@ -3,14 +3,12 @@ package event_sender_go import ( "context" "google.golang.org/grpc" - "loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" q "loafle.com/overflow/queue_go" "log" "reflect" "sync" - "time" ) var ( @@ -39,30 +37,45 @@ func GetInstance() *EventSender { return instance } -func init() { - ch := make(chan interface{}, 0) - ch1 := make(chan interface{}, 0) - observer.Add(messages.CFG_LOADED, ch) - observer.Add(messages.SCF_STOPPED, ch1) - handleInit(ch, ch1) +//func init() { +// ch := make(chan interface{}, 0) +// ch1 := make(chan interface{}, 0) +// observer.Add(messages.CFG_LOADED, ch) +// observer.Add(messages.SCF_STOPPED, ch1) +// handleInit(ch, ch1) +//} +// +//func handleInit(ch chan interface{}, ch1 chan interface{}) { +// es := GetInstance() +// go func() { +// data := <-ch +// log.Println("handleInit", data) +// //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() +// es.start() +// observer.Remove(messages.CFG_LOADED, ch) +// }() +// +// go func() { +// data := <-ch1 +// observer.Notify(messages.QUEUE_EVENT, data) +// time.Sleep(2 * time.Second) +// es.Stop() +// }() +//} + + +func Start(ch chan bool) (err error) { + es := GetInstance() + + es.start() + + ch <- true + return nil } -func handleInit(ch chan interface{}, ch1 chan interface{}) { +func AddEventData(event interface{}) { es := GetInstance() - go func() { - data := <-ch - log.Println("handleInit", data) - //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() - es.start() - observer.Remove(messages.CFG_LOADED, ch) - }() - - go func() { - data := <-ch1 - observer.Notify(messages.QUEUE_EVENT, data) - time.Sleep(2 * time.Second) - es.Stop() - }() + es.lq.PushItem(event) } type EventSender struct { @@ -74,13 +87,14 @@ type EventSender struct { func (es *EventSender) start() { es.sc = make(chan interface{}, 0) - es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc) + es.lq = q.NewQueue(DEFAULT_INTERVAL, es.sc) go es.checkQueue() } func (es *EventSender) Stop() { if es.sc != nil { + es.lq.Close() close(es.sc) es.sc = nil }