From 762e5cf64bef4e98d96ba168bd603e813609082f Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Thu, 11 May 2017 18:58:19 +0900 Subject: [PATCH] data_sender --- data_sender.go | 24 ++++++++++-------------- data_sender_event.go | 24 ++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 14 deletions(-) create mode 100644 data_sender_event.go diff --git a/data_sender.go b/data_sender.go index 10db22e..2310844 100644 --- a/data_sender.go +++ b/data_sender.go @@ -6,7 +6,6 @@ import ( "google.golang.org/grpc" "io/ioutil" cm "loafle.com/overflow/agent_api/config_manager" - "loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" q "loafle.com/overflow/queue_go" @@ -40,7 +39,7 @@ type DataSender struct { } func init() { - addObservers() + go handleConfigLoaded() } func GetInstance() *DataSender { @@ -50,20 +49,14 @@ func GetInstance() *DataSender { return instance } -func addObservers() { - ch := make(chan interface{}, 0) - observer.Add(messages.CONFIGMANAGER_LOADED, ch) - handleInit(ch) -} - -func handleInit(ch chan interface{}) { +func startDataSender(ch chan interface{}) { ds := GetInstance() go func() { data := <-ch log.Println(data) - //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() + ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() ds.start() - close(ch) + ds.addObservers() }() } @@ -71,14 +64,17 @@ func (ds *DataSender) start() { ds.once.Do(func() { qc := make(chan interface{}) ds.queue = q.NewQueue(messages.QUEUE_DATA, DEFAULT_INTERVAL, qc) - go ds.handleData(qc) }) } -func (ds *DataSender) Stop() { - observer.Notify(messages.DATA_SENDER_STOPPED, true) + +func (ds *DataSender) addObservers() { + go ds.handleAgentStop() + go ds.handleCollectedData() } +func (ds *DataSender) Stop() {} + func (ds *DataSender) handleData(qc chan interface{}) { for { select { diff --git a/data_sender_event.go b/data_sender_event.go new file mode 100644 index 0000000..a527851 --- /dev/null +++ b/data_sender_event.go @@ -0,0 +1,24 @@ +package data_sender_go + +import ( + "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" +) + +func handleConfigLoaded() { + ch := make(chan interface{}, 0) + observer.Add(messages.CFG_LOADED, ch) + startDataSender(ch) + observer.Remove(messages.CFG_LOADED, ch) +} + +func (ds *DataSender) handleAgentStop() { + ch := make(chan interface{}, 0) + observer.Add(messages.AGT_STOPPED, ch) + ds.Stop() + observer.Remove(messages.AGT_STOPPED, ch) +} + +func (ds *DataSender) handleCollectedData() { + +}