data_sender

This commit is contained in:
insanity@loafle.com 2017-05-11 18:58:19 +09:00
parent c1ca0aa45c
commit 762e5cf64b
2 changed files with 34 additions and 14 deletions

View File

@ -6,7 +6,6 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"io/ioutil" "io/ioutil"
cm "loafle.com/overflow/agent_api/config_manager" cm "loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages" "loafle.com/overflow/agent_api/observer/messages"
pb "loafle.com/overflow/crawler_go/grpc" pb "loafle.com/overflow/crawler_go/grpc"
q "loafle.com/overflow/queue_go" q "loafle.com/overflow/queue_go"
@ -40,7 +39,7 @@ type DataSender struct {
} }
func init() { func init() {
addObservers() go handleConfigLoaded()
} }
func GetInstance() *DataSender { func GetInstance() *DataSender {
@ -50,20 +49,14 @@ func GetInstance() *DataSender {
return instance return instance
} }
func addObservers() { func startDataSender(ch chan interface{}) {
ch := make(chan interface{}, 0)
observer.Add(messages.CONFIGMANAGER_LOADED, ch)
handleInit(ch)
}
func handleInit(ch chan interface{}) {
ds := GetInstance() ds := GetInstance()
go func() { go func() {
data := <-ch data := <-ch
log.Println(data) log.Println(data)
//ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
ds.start() ds.start()
close(ch) ds.addObservers()
}() }()
} }
@ -71,14 +64,17 @@ func (ds *DataSender) start() {
ds.once.Do(func() { ds.once.Do(func() {
qc := make(chan interface{}) qc := make(chan interface{})
ds.queue = q.NewQueue(messages.QUEUE_DATA, DEFAULT_INTERVAL, qc) ds.queue = q.NewQueue(messages.QUEUE_DATA, DEFAULT_INTERVAL, qc)
go ds.handleData(qc) go ds.handleData(qc)
}) })
} }
func (ds *DataSender) Stop() {
observer.Notify(messages.DATA_SENDER_STOPPED, true) func (ds *DataSender) addObservers() {
go ds.handleAgentStop()
go ds.handleCollectedData()
} }
func (ds *DataSender) Stop() {}
func (ds *DataSender) handleData(qc chan interface{}) { func (ds *DataSender) handleData(qc chan interface{}) {
for { for {
select { select {

24
data_sender_event.go Normal file
View File

@ -0,0 +1,24 @@
package data_sender_go
import (
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
)
func handleConfigLoaded() {
ch := make(chan interface{}, 0)
observer.Add(messages.CFG_LOADED, ch)
startDataSender(ch)
observer.Remove(messages.CFG_LOADED, ch)
}
func (ds *DataSender) handleAgentStop() {
ch := make(chan interface{}, 0)
observer.Add(messages.AGT_STOPPED, ch)
ds.Stop()
observer.Remove(messages.AGT_STOPPED, ch)
}
func (ds *DataSender) handleCollectedData() {
}