This commit is contained in:
geek 2017-05-15 18:36:08 +09:00
parent 66e517fc35
commit 5b9d23fdaa

View File

@ -3,14 +3,12 @@ package event_sender_go
import ( import (
"context" "context"
"google.golang.org/grpc" "google.golang.org/grpc"
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages" "loafle.com/overflow/agent_api/observer/messages"
pb "loafle.com/overflow/crawler_go/grpc" pb "loafle.com/overflow/crawler_go/grpc"
q "loafle.com/overflow/queue_go" q "loafle.com/overflow/queue_go"
"log" "log"
"reflect" "reflect"
"sync" "sync"
"time"
) )
var ( var (
@ -39,30 +37,45 @@ func GetInstance() *EventSender {
return instance return instance
} }
func init() { //func init() {
ch := make(chan interface{}, 0) // ch := make(chan interface{}, 0)
ch1 := make(chan interface{}, 0) // ch1 := make(chan interface{}, 0)
observer.Add(messages.CFG_LOADED, ch) // observer.Add(messages.CFG_LOADED, ch)
observer.Add(messages.SCF_STOPPED, ch1) // observer.Add(messages.SCF_STOPPED, ch1)
handleInit(ch, ch1) // handleInit(ch, ch1)
//}
//
//func handleInit(ch chan interface{}, ch1 chan interface{}) {
// es := GetInstance()
// go func() {
// data := <-ch
// log.Println("handleInit", data)
// //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
// es.start()
// observer.Remove(messages.CFG_LOADED, ch)
// }()
//
// go func() {
// data := <-ch1
// observer.Notify(messages.QUEUE_EVENT, data)
// time.Sleep(2 * time.Second)
// es.Stop()
// }()
//}
func Start(ch chan bool) (err error) {
es := GetInstance()
es.start()
ch <- true
return nil
} }
func handleInit(ch chan interface{}, ch1 chan interface{}) { func AddEventData(event interface{}) {
es := GetInstance() es := GetInstance()
go func() { es.lq.PushItem(event)
data := <-ch
log.Println("handleInit", data)
//ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
es.start()
observer.Remove(messages.CFG_LOADED, ch)
}()
go func() {
data := <-ch1
observer.Notify(messages.QUEUE_EVENT, data)
time.Sleep(2 * time.Second)
es.Stop()
}()
} }
type EventSender struct { type EventSender struct {
@ -74,13 +87,14 @@ type EventSender struct {
func (es *EventSender) start() { func (es *EventSender) start() {
es.sc = make(chan interface{}, 0) es.sc = make(chan interface{}, 0)
es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc) es.lq = q.NewQueue(DEFAULT_INTERVAL, es.sc)
go es.checkQueue() go es.checkQueue()
} }
func (es *EventSender) Stop() { func (es *EventSender) Stop() {
if es.sc != nil { if es.sc != nil {
es.lq.Close()
close(es.sc) close(es.sc)
es.sc = nil es.sc = nil
} }