observer -> call method

This commit is contained in:
snoop 2017-05-16 18:05:49 +09:00
parent 56ef04f440
commit 0bd2515a19
3 changed files with 129 additions and 64 deletions

View File

@ -9,7 +9,7 @@ import (
"os"
"strings"
"sync"
"loafle.com/overflow/agent_api/messages"
)
type configManager struct {
@ -20,26 +20,6 @@ type configManager struct {
var _configManager *configManager
func Start(ch chan *config_manager.GlobalConfig, path string) error {
_configManager = GetInstance()
_configManager.loadGlobalConfig(path + "/" + "global.yaml")
ch <- _configManager.GetGlobalConfig()
return nil
}
func StartSensorConfig(ch chan config_manager.ConfigManager, authKey string) error {
if err := _configManager.loadCrawlerConfigAll(); err != nil {
return err
}
ch <- _configManager
return nil
}
func Stop(ch chan bool) {
GetInstance().stop()
ch <- true
}
func GetInstance() *configManager {
var once sync.Once
once.Do(func() {
@ -79,7 +59,7 @@ func (c *configManager) loadGlobalConfig(path string) {
data, err := ioutil.ReadFile(path)
if err != nil {
}
err = yaml.Unmarshal([]byte(data), &c.globalConfig)
err = yaml.Unmarshal(data, &c.globalConfig)
if err != nil {
}
}
@ -129,14 +109,14 @@ func (c *configManager) loadCrawlerConfig(root string, dir string) {
}
func (c *configManager) addConfig() {
for data := range c.addCh {
func (c *configManager) addConfig(tmp string) {
//for data := range c.addCh {
path := data.(string)
path := tmp
b, err := ioutil.ReadFile(path)
if err != nil {
// error process
continue
log.Panic(err)
}
// 임시파일을 로드 , Config로 변환
@ -144,7 +124,7 @@ func (c *configManager) addConfig() {
err = json.Unmarshal(b, &m)
if err != nil {
// error process
continue
log.Panic(err)
}
// agent 폴더 / config / crawler / .. / .. / .. / 에 해당하는 파일이 있는지 확인, 있다면 삭제
@ -156,28 +136,28 @@ func (c *configManager) addConfig() {
err = os.Remove(path)
if err != nil {
// error process
continue
log.Panic(err)
}
// Config 맵에 저장
c.configs[m.Id] = &m
// notify id
err = observer.Notify(messages.SCF_SENSOR_ADD_DONE, m.Id)
if err != nil {
continue
}
}
//err = observer.Notify(messages.SCF_SENSOR_ADD_DONE, m.Id)
//if err != nil {
// continue
//}
//}
}
func (c *configManager) removeConfig() {
for data := range c.removeCh {
removeid := data.(string)
func (c *configManager) removeConfig(id string) {
//for data := range c.removeCh {
removeid := id
// check exists
config, ok := c.configs[removeid]
if !ok {
continue
return;
}
// 해당 파일 삭제
@ -185,18 +165,18 @@ func (c *configManager) removeConfig() {
err := os.Remove(path)
if err != nil {
// error check
continue
log.Panic(err)
}
// 해당 id 삭제
delete(c.configs, removeid)
// notify id
err = observer.Notify("TASK_DONE", removeid)
if err != nil {
continue
}
}
//err = observer.Notify("TASK_DONE", removeid)
//if err != nil {
// continue
//}
//}
}

View File

@ -2,37 +2,37 @@ package config_manager_go
import (
"testing"
"github.com/stretchr/testify/assert"
//"github.com/stretchr/testify/assert"
"io/ioutil"
"log"
"loafle.com/overflow/agent_api/config_manager"
"encoding/json"
"github.com/google/uuid"
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
"loafle.com/overflow/agent_api/messages"
"time"
)
func TestLoadConfig(t *testing.T) {
// notify temp channel
ch := make(chan interface{},0)
observer.Add(messages.CFG_LOADED,ch)
go func() {
data :=<- ch
c := data.(config_manager.ConfigManager)
cc := c.GetSensors()
assert.NotEqual(t, len(cc),0)
}()
//ch := make(chan interface{},0)
//observer.Add(messages.CFG_LOADED,ch)
//go func() {
// data :=<- ch
// c := data.(config_manager.ConfigManager)
// cc := c.GetSensors()
// assert.NotEqual(t, len(cc),0)
//}()
// make config manager after to load
c := NewConfigManager()
c.loadGlobalConfig("/root/gowork/src/loafle.com/overflow/config_manager_go/test_agent/global.yaml")
c.loadCrawlerConfigAll()
assert.NotEqual(t, len(c.configs),0)
observer.Notify(messages.CFG_LOADED,c)
time.Sleep(1 * time.Second)
//c := NewConfigManager()
//c.loadGlobalConfig("/root/gowork/src/loafle.com/overflow/config_manager_go/test_agent/global.yaml")
//c.loadCrawlerConfigAll()
//
//assert.NotEqual(t, len(c.configs),0)
//observer.Notify(messages.CFG_LOADED,c)
//time.Sleep(1 * time.Second)
}
@ -60,11 +60,11 @@ func TestAddConfig(t *testing.T) {
func TestRemoveConfig(t *testing.T) {
_configManager.loadGlobalConfig("/root/gowork/src/loafle.com/overflow/config_manager_go/test_agent/global.yaml")
GetInstance()
_configManager.loadGlobalConfig("/home/snoop/develop/path/go/src/loafle.com/overflow/config_manager_go/test_agent/global.yaml")
_configManager.loadCrawlerConfigAll()
// remove test
observer.Notify(messages.TASK_SENSOR_REMOVE,"d0fcc7b1-43a7-4acd-a7bf-c9572a9d4c9e")
//observer.Notify(messages.TASK_SENSOR_REMOVE,"d0fcc7b1-43a7-4acd-a7bf-c9572a9d4c9e")
time.Sleep(1 * time.Second)
}

View File

@ -1,10 +1,95 @@
package config_manager_go
import (
"loafle.com/overflow/agent_api/config_manager"
)
//import (
// "loafle.com/overflow/agent_api/observer"
// "loafle.com/overflow/agent_api/messages"
//)
//
func Start(ch chan *config_manager.GlobalConfig, path string) error {
_configManager = GetInstance()
_configManager.loadGlobalConfig(path + "/" + "global.yaml")
ch <- _configManager.GetGlobalConfig()
return nil
}
func StartSensorConfig(ch chan config_manager.ConfigManager, authKey string) error {
if err := _configManager.loadCrawlerConfigAll(); err != nil {
return err
}
ch <- _configManager
return nil
}
func Stop(ch chan bool) {
GetInstance().stop()
ch <- true
}
func AddSensor(tmpFilePath string) {
GetInstance().addConfig(tmpFilePath)
////move file
//moveFile := tmpFilePath
////load file
//
//file, err := os.Open(moveFile)
//
//if err != nil {
// log.Panic(err)
//}
//
//b, err := ioutil.ReadFile(moveFile)
//if err != nil {
// log.Panic(err)
//}
//var m = config_manager.Config{}
//json.Unmarshal(b, &m)
//GetInstance().configs[file.Name()] = &m
}
func RemoveSensor(id string) {
//remove object
// remove file
GetInstance().removeConfig(id)
}
func UpdateSensor(tmpFile string) {
//GetInstance().
//overwrite file
// reload file
GetInstance().addConfig(tmpFile)
}
//func agentStartHandler() {
// agentStart := make(chan interface{}, 0)
// observer.Add(messages.AGT_STARTING, agentStart)