From b4d709693cc4e583afc12bc9ecbc9b9edb929eed Mon Sep 17 00:00:00 2001 From: snoop Date: Thu, 27 Apr 2017 18:07:56 +0900 Subject: [PATCH] changed logic --- crawler_communicator.go | 48 ++++++++++++++ crawler_manager.go | 138 +++++++++++++++++---------------------- crawler_manager_event.go | 45 +++++++++++++ crawler_manager_test.go | 111 +++++++++++++++++++++---------- 4 files changed, 230 insertions(+), 112 deletions(-) create mode 100644 crawler_manager_event.go diff --git a/crawler_communicator.go b/crawler_communicator.go index 48848ee..3cb00e8 100644 --- a/crawler_communicator.go +++ b/crawler_communicator.go @@ -9,6 +9,8 @@ import ( "strings" + "loafle.com/overflow/agent_api/config_manager" + "encoding/json" ) @@ -41,6 +43,49 @@ func callAdd(container *string, crawlerName *string, id *string) bool { } +func callInitConfig(container *string, cl []*config_manager.Config) bool { + + port := GetInstance().portMap[*container] + + conn, err := grpc.Dial(address+port, grpc.WithInsecure()) + if err != nil { + log.Fatalf("did not connect: %v", err) + return false + } + defer conn.Close() + + cc := g.NewConfigClient(conn) + + inArr := &g.InputArray{} + + for _, conf := range cl { + + in := &g.Init{} + in.Name = g.Crawlers(g.Crawlers_value[conf.Crawler.Name]) + + b, err := json.Marshal(conf) + if err != nil { + continue + } + + in.Path = string(b) + + inArr.In = append(inArr.In, in) + } + + + outInit, errInit := cc.Init(context.Background(), inArr) + if errInit != nil { + log.Println(errInit) + return false + } + log.Println("callInit:",outInit) + + return true + +} + + func callInit(container *string, paths *[]string) bool { port := GetInstance().portMap[*container] @@ -48,6 +93,9 @@ func callInit(container *string, paths *[]string) bool { return callInitAddress(address + port, paths) } + + + func callInitAddress(address string, paths *[]string) bool { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { diff --git a/crawler_manager.go b/crawler_manager.go index 163b672..23497eb 100644 --- a/crawler_manager.go +++ b/crawler_manager.go @@ -1,8 +1,8 @@ package crawler_manager import ( - "loafle.com/overflow/crawler_go/config" - "encoding/json" + //"loafle.com/overflow/crawler_go/config" + //"encoding/json" "io/ioutil" "google.golang.org/grpc" "strconv" @@ -12,8 +12,9 @@ import ( "log" - "path/filepath" + "errors" + "loafle.com/overflow/agent_api/config_manager" ) const ( @@ -22,7 +23,7 @@ const ( defaultPort = 50000 rootFolder = "/home/cm2/" - ConfigFolder = rootFolder + "/config/container/" // +container + ConfigFolder = rootFolder + "/config/container/" BinaryFolder = rootFolder + "/container/" PidFolder = rootFolder + "/pids/" runFile = "ttnc" @@ -34,23 +35,34 @@ type CrawlerManager struct { currentPort int portMap map[string]string pidMap map[string]string + ConfigMgr config_manager.ConfigManager } func init() { + GetInstance(); + + listenEvent() + + //g_CrawlerMananger.init() +} + +func GetInstance() *CrawlerManager { + if g_CrawlerMananger == nil { g_CrawlerMananger = &CrawlerManager{portMap:make(map[string]string),pidMap:make(map[string]string), currentPort:defaultPort} } - g_CrawlerMananger.init() + return g_CrawlerMananger; } + func (c *CrawlerManager)GetClient(container string) (*grpc.ClientConn, error) { b := c.checkContainer(&container) if b == false { - err := c.runAndInitContainer(&container) + err := c.runAndInitContainerOne(&container) if err != nil { return nil, err } @@ -60,66 +72,23 @@ func (c *CrawlerManager)GetClient(container string) (*grpc.ClientConn, error) { return grpc.Dial(address+gport, grpc.WithInsecure()) } -func GetInstance() *CrawlerManager { - return g_CrawlerMananger; -} -func readConfig(path string ) *config.Config { - - bytes, err := ioutil.ReadFile(path) - - if err != nil { - return nil - } - - cc := config.Config{} - - json.Unmarshal(bytes, &cc) - - return &cc -} - - func (c *CrawlerManager)init() { c.checkPid() - cs := c.isStartContainer() - - for _, cc := range cs { - - err := c.runAndInitContainer(&cc) - - if err != nil { - log.Println(err) - continue - } + cmap := c.ConfigMgr.GetCrawlers() + scm := sortContainer(cmap) + for ctn := range scm { + c.runAndInitContainer(&ctn, scm[ctn]) } } -func (c *CrawlerManager)isStartContainer() []string { - - files, _ := ioutil.ReadDir(ConfigFolder) - - var cs []string - - for _,file := range files { - - if file.IsDir() { - b := existConfigFile(ConfigFolder, file.Name()) - if b { - cs = append(cs, file.Name()) - } - } - - } - return cs -} func (c *CrawlerManager)checkContainer(container *string) bool { return callStatus(container) @@ -150,8 +119,7 @@ func (c *CrawlerManager)checkPid() { } - -func (c *CrawlerManager)runAndInitContainer(container *string) error { +func (c *CrawlerManager)runAndInitContainerOne(container *string) error { err := c.runContainer(container) @@ -159,12 +127,30 @@ func (c *CrawlerManager)runAndInitContainer(container *string) error { return err } - dirs := getConfigPaths(container) + cmap := c.ConfigMgr.GetCrawlers() + scm := sortContainer(cmap) - if dirs == nil { - return errors.New("not found config") + b := callInitConfig(container, scm[*container]) + + if b == false { + return errors.New("call init failed") } - b := callInit(container, dirs) + + return nil + +} + + +func (c *CrawlerManager)runAndInitContainer(container *string,cl []*config_manager.Config) error { + + err := c.runContainer(container) + + if err != nil { + return err + } + + b := callInitConfig(container, cl) + if b == false { return errors.New("call init failed") } @@ -173,6 +159,20 @@ func (c *CrawlerManager)runAndInitContainer(container *string) error { return nil } +func sortContainer(cm map[string]*config_manager.Config) map[string][]*config_manager.Config { + + m := make(map[string][]*config_manager.Config) + + var cn string; + for key := range cm { + cn = cm[key].Crawler.Container + m[cn] = append(m[cn], cm[key]) + } + + return m +} + + func (c *CrawlerManager)runContainer(container *string) error { b := c.checkContainer(container) @@ -264,23 +264,3 @@ func (c *CrawlerManager)removeProcessFile(pid *string) { log.Println(err) } } - - -func (c *CrawlerManager)activeCrawler(container *string) []string { - - var dirs []string - existConfigFileDir(ConfigFolder,*container,&dirs) - - if len(dirs) <= 0 { - return nil - } - - var r []string - for _, path := range dirs { - r = append(r, filepath.Base(path)) - } - - return r - -} - diff --git a/crawler_manager_event.go b/crawler_manager_event.go new file mode 100644 index 0000000..05e3cdf --- /dev/null +++ b/crawler_manager_event.go @@ -0,0 +1,45 @@ +package crawler_manager + +import ( + "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/config_manager" + "loafle.com/overflow/agent_api/observer/messages" + + "fmt" +) + + +func listenEvent() { + + go listenConfigLoaded(); + go listenAddSensor(); + +} + +func listenConfigLoaded() { + + ch := make(chan interface{}, 0) + observer.Add(messages.CONFIGMANAGER_LOADED, ch) + + o := <-ch + + cm := o.(config_manager.ConfigManager) + + GetInstance().ConfigMgr = cm; + + GetInstance().init(); + +} + +func listenAddSensor() { + ch := make(chan interface{}, 0) + observer.Add(messages.ADD_SENSOR_1, ch); + + o := <-ch + + str := o.(string) + + fmt.Println(str) + //GetInstance().Addconfig(str) +} + diff --git a/crawler_manager_test.go b/crawler_manager_test.go index 8b7bee0..f6ab803 100644 --- a/crawler_manager_test.go +++ b/crawler_manager_test.go @@ -12,7 +12,9 @@ import ( "strings" "io/ioutil" - + "loafle.com/overflow/agent_api/config_manager" + "encoding/json" + "fmt" ) @@ -218,39 +220,39 @@ func TestInitCM(t *testing.T) { } func TestCallStatus(t *testing.T) { - c := "java" - GetInstance().runAndInitContainer(&c) - - callStatus(&c) - - GetInstance().stopContainer(&c) + //c := "java" + //GetInstance().runAndInitContainer(&c) + // + //callStatus(&c) + // + //GetInstance().stopContainer(&c) } func TestCallAdd(t *testing.T) { + //// + //c := "java" + //cn := "HEALTH_"+"MYSQL" + //id := "id.json" // - c := "java" - cn := "HEALTH_"+"MYSQL" - id := "id.json" - - GetInstance().runAndInitContainer(&c) - - callAdd(&c, &cn, &id) - - GetInstance().stopContainer(&c) + //GetInstance().runAndInitContainer(&c) + // + //callAdd(&c, &cn, &id) + // + //GetInstance().stopContainer(&c) } func TestCallRemove22(t *testing.T) { - c := "java" - cn := "HEALTH_"+"MYSQL" - id := "id.json" - - GetInstance().runAndInitContainer(&c) - - callRemove(&c, &cn, &id) - - GetInstance().stopContainer(&c) + //c := "java" + //cn := "HEALTH_"+"MYSQL" + //id := "id.json" + // + //GetInstance().runAndInitContainer(&c) + // + //callRemove(&c, &cn, &id) + // + //GetInstance().stopContainer(&c) } @@ -268,14 +270,14 @@ func TestRunRun(t *testing.T) { func TestActiveCrawler(t *testing.T) { - c := "java" - GetInstance().runContainer(&c) - - aa := GetInstance().activeCrawler(&c) - - t.Log(aa) - - GetInstance().stopContainer(&c) + //c := "java" + //GetInstance().runContainer(&c) + // + //aa := GetInstance().activeCrawler(&c) + // + //t.Log(aa) + // + //GetInstance().stopContainer(&c) } @@ -289,4 +291,47 @@ func TestErer(t *testing.T) { } +func TestStartContainer(t *testing.T) { + + cs := GetStartContainer() + + t.Log(cs) + +} + + +func GetStartContainer() []string { + + files, _ := ioutil.ReadDir(ConfigFolder) + + var cs []string + + for _,file := range files { + + if file.IsDir() { + b := existConfigFile(ConfigFolder, file.Name()) + if b { + cs = append(cs, file.Name()) + } + } + + } + return cs +} + +func TestJson(t *testing.T) { + + c := config_manager.Config{} + + c.Crawler.Name = "wmi_crawler" + c.Crawler.Container = "java_proxy"; + + c.Id = "WMI_TEST_ID_001" + + + b, _ := json.Marshal(c) + + fmt.Println(string(b)) + +}