diff --git a/crawler_communicator.go b/crawler_communicator.go index 3cb00e8..287279a 100644 --- a/crawler_communicator.go +++ b/crawler_communicator.go @@ -1,20 +1,20 @@ package crawler_manager import ( - "path/filepath" + "google.golang.org/grpc" "log" g "loafle.com/overflow/crawler_go/grpc" "context" - "strings" + "loafle.com/overflow/agent_api/config_manager" "encoding/json" ) -func callAdd(container *string, crawlerName *string, id *string) bool { +func callAdd(container *string, conf *config_manager.Config) bool { port := GetInstance().portMap[*container] @@ -27,10 +27,15 @@ func callAdd(container *string, crawlerName *string, id *string) bool { cc := g.NewConfigClient(conn) - in := &g.Input{} + in := &g.InputAdd{} - in.Id = *id - in.Name = g.Crawlers(g.Crawlers_value[*crawlerName]) + b, err := json.Marshal(conf) + if err != nil { + return false + } + + in.Data = b + in.Name = g.Crawlers(g.Crawlers_value[conf.Crawler.Name]) out, err := cc.Add(context.Background(), in) if err != nil { @@ -68,8 +73,7 @@ func callInitConfig(container *string, cl []*config_manager.Config) bool { continue } - in.Path = string(b) - + in.Data = b inArr.In = append(inArr.In, in) } @@ -86,53 +90,53 @@ func callInitConfig(container *string, cl []*config_manager.Config) bool { } -func callInit(container *string, paths *[]string) bool { - - port := GetInstance().portMap[*container] - - return callInitAddress(address + port, paths) -} +//func callInit(container *string, paths *[]string) bool { +// +// port := GetInstance().portMap[*container] +// +// return callInitAddress(address + port, paths) +//} +// +// +//func callInitAddress(address string, paths *[]string) bool { +// conn, err := grpc.Dial(address, grpc.WithInsecure()) +// if err != nil { +// log.Fatalf("did not connect: %v", err) +// return false +// } +// defer conn.Close() +// +// cc := g.NewConfigClient(conn) +// +// inArr := &g.InputArray{} +// base := "HEALTH_" +// for _, path := range *paths { +// +// in := &g.Init{} +// //in.Path = "/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/" +// in.Path = path + "/" +// bcn := filepath.Base(path) +// bcn = strings.ToUpper(bcn) +// +// in.Name = g.Crawlers(g.Crawlers_value[base+bcn]) +// //in.Name = g.Crawlers(g.Crawlers_value[g.Crawlers_HEALTH_REDIS.String()]) //test +// inArr.In = append(inArr.In, in) +// } +// +// outInit, errInit := cc.Init(context.Background(), inArr) +// if errInit != nil { +// log.Println(errInit) +// return false +// } +// log.Println("callInit:",outInit) +// +// return true +//} -func callInitAddress(address string, paths *[]string) bool { - conn, err := grpc.Dial(address, grpc.WithInsecure()) - if err != nil { - log.Fatalf("did not connect: %v", err) - return false - } - defer conn.Close() - - cc := g.NewConfigClient(conn) - - inArr := &g.InputArray{} - base := "HEALTH_" - for _, path := range *paths { - - in := &g.Init{} - //in.Path = "/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/" - in.Path = path + "/" - bcn := filepath.Base(path) - bcn = strings.ToUpper(bcn) - - in.Name = g.Crawlers(g.Crawlers_value[base+bcn]) - //in.Name = g.Crawlers(g.Crawlers_value[g.Crawlers_HEALTH_REDIS.String()]) //test - inArr.In = append(inArr.In, in) - } - - outInit, errInit := cc.Init(context.Background(), inArr) - if errInit != nil { - log.Println(errInit) - return false - } - log.Println("callInit:",outInit) - - return true -} - - -func callRemove(container *string, crawlerName *string, id *string) { +func callRemove(container *string, conf *config_manager.Config) { port := GetInstance().portMap[*container] @@ -146,8 +150,8 @@ func callRemove(container *string, crawlerName *string, id *string) { cc := g.NewConfigClient(conn) inR := &g.Input{} - inR.Id = *id - inR.Name = g.Crawlers(g.Crawlers_value[*crawlerName]) + //inR.Id = *id //FIXME + inR.Name = g.Crawlers(g.Crawlers_value[conf.Crawler.Name]) outRem, errRem := cc.Remove(context.Background(), inR) if errRem != nil { diff --git a/crawler_manager.go b/crawler_manager.go index 23497eb..8167172 100644 --- a/crawler_manager.go +++ b/crawler_manager.go @@ -94,6 +94,20 @@ func (c *CrawlerManager)checkContainer(container *string) bool { return callStatus(container) } +func (c *CrawlerManager)checkAndRunContainer(container *string) bool { + + b := callStatus(container) + + if b == false { + err := c.runAndInitContainerOne(container) + if err != nil { + return false + } + } + + return true +} + func (c *CrawlerManager)checkPid() { @@ -216,6 +230,33 @@ func (c *CrawlerManager)runContainer(container *string) error { return nil } +func (c *CrawlerManager)addSensor(id string) { + + conf := c.ConfigMgr.GetCrawlerById(id) + + b := c.checkAndRunContainer(&conf.Crawler.Name) + + if b == false { + return + } + + b = callAdd(&conf.Crawler.Container, conf) + + if b { + //FIXME :: Noti ADD_SENSOR_2_END + } +} + +func (c *CrawlerManager)removeSensor(id string) { + + conf := c.ConfigMgr.GetCrawlerById(id) + + callRemove(&conf.Crawler.Name, conf) + + //remove and stop + +} + func (c *CrawlerManager)stopContainerAll() { diff --git a/crawler_manager_event.go b/crawler_manager_event.go index 05e3cdf..87a244b 100644 --- a/crawler_manager_event.go +++ b/crawler_manager_event.go @@ -13,6 +13,7 @@ func listenEvent() { go listenConfigLoaded(); go listenAddSensor(); + go listenRemoveSensor(); } @@ -40,6 +41,21 @@ func listenAddSensor() { str := o.(string) fmt.Println(str) - //GetInstance().Addconfig(str) + + GetInstance().addSensor(str) + +} + +func listenRemoveSensor() { + + ch := make(chan interface{}, 0) + //observer.Add(messages.REMOVE_SENSOR_2_END, ch); + + o := <-ch + + str := o.(string) + + GetInstance().removeSensor(str) + }