package crawler_manager import ( //"loafle.com/overflow/crawler_go/config" //"encoding/json" "io/ioutil" "google.golang.org/grpc" "strconv" "os/exec" "time" "os" "log" "errors" "loafle.com/overflow/agent_api/config_manager" ) const ( address = "localhost:" portArgsName = "-Port=" defaultPort = 50000 rootFolder = "/home/cm2/" ConfigFolder = rootFolder + "/config/container/" BinaryFolder = rootFolder + "/container/" PidFolder = rootFolder + "/pids/" runFile = "ttnc" ) var g_CrawlerMananger *CrawlerManager = nil type CrawlerManager struct { currentPort int portMap map[string]string pidMap map[string]string ConfigMgr config_manager.ConfigManager } func init() { GetInstance(); listenEvent() //g_CrawlerMananger.init() } func GetInstance() *CrawlerManager { if g_CrawlerMananger == nil { g_CrawlerMananger = &CrawlerManager{portMap:make(map[string]string),pidMap:make(map[string]string), currentPort:defaultPort} } return g_CrawlerMananger; } func (c *CrawlerManager)GetClient(container string) (*grpc.ClientConn, error) { b := c.checkContainer(&container) if b == false { err := c.runAndInitContainerOne(&container) if err != nil { return nil, err } } gport := c.portMap[container] return grpc.Dial(address+gport, grpc.WithInsecure()) } func (c *CrawlerManager)init() { c.checkPid() cmap := c.ConfigMgr.GetCrawlers() scm := sortContainer(cmap) for ctn := range scm { c.runAndInitContainer(&ctn, scm[ctn]) } } func (c *CrawlerManager)checkContainer(container *string) bool { return callStatus(container) } func (c *CrawlerManager)checkAndRunContainer(container *string) bool { b := callStatus(container) if b == false { err := c.runAndInitContainerOne(container) if err != nil { return false } } return true } func (c *CrawlerManager)checkPid() { files, err := ioutil.ReadDir(PidFolder) if err != nil { log.Println(err) return } for _, file := range files { if file.IsDir() { continue } str := file.Name() c.stopProcess(&str) c.removeProcessFile(&str) } } func (c *CrawlerManager)runAndInitContainerOne(container *string) error { err := c.runContainer(container) if err != nil { return err } cmap := c.ConfigMgr.GetCrawlers() scm := sortContainer(cmap) b := callInitConfig(container, scm[*container]) if b == false { return errors.New("call init failed") } return nil } func (c *CrawlerManager)runAndInitContainer(container *string,cl []*config_manager.Config) error { err := c.runContainer(container) if err != nil { return err } b := callInitConfig(container, cl) if b == false { return errors.New("call init failed") } return nil } func sortContainer(cm map[string]*config_manager.Config) map[string][]*config_manager.Config { m := make(map[string][]*config_manager.Config) var cn string; for key := range cm { cn = cm[key].Crawler.Container m[cn] = append(m[cn], cm[key]) } return m } func (c *CrawlerManager)runContainer(container *string) error { b := c.checkContainer(container) if b { return nil } cmdStr := getRunCommand(container) for { pArg := portArgsName + strconv.Itoa(c.currentPort) cmd := exec.Command(cmdStr, pArg) err := cmd.Start() if err != nil { //run error break; log.Println(err) return err } time.Sleep(time.Duration( time.Second * 2)) b := callStatusAddress(address + strconv.Itoa(c.currentPort)) if b == false { c.currentPort++ continue; } log.Println(*container + " run success port:" , c.currentPort , "pid:", cmd.Process.Pid ) writePid(cmd.Process.Pid) c.portMap[*container] = strconv.Itoa(c.currentPort) c.pidMap[*container] = strconv.Itoa(cmd.Process.Pid) c.currentPort++ break; } return nil } func (c *CrawlerManager)addSensor(id string) { conf := c.ConfigMgr.GetCrawlerById(id) b := c.checkAndRunContainer(&conf.Crawler.Name) if b == false { return } b = callAdd(&conf.Crawler.Container, conf) if b { //FIXME :: Noti ADD_SENSOR_2_END } } func (c *CrawlerManager)removeSensor(id string) { conf := c.ConfigMgr.GetCrawlerById(id) callRemove(&conf.Crawler.Name, conf) //remove and stop } func (c *CrawlerManager)stopContainerAll() { for k, _ := range c.pidMap { c.stopContainer(&k) } } func (c *CrawlerManager)stopContainer(container *string) { pid := c.pidMap[*container] if len(pid) <= 0 { return } c.stopProcess(&pid) c.removeProcessFile(&pid) delete(c.pidMap, *container) delete(c.portMap, *container) } func (c *CrawlerManager)stopProcess(pid *string) { pidi, err := strconv.Atoi(*pid) if err != nil { log.Println(err) } p, err := os.FindProcess(pidi) if err != nil { log.Println(err) } p.Kill() } func (c *CrawlerManager)removeProcessFile(pid *string) { err := os.Remove(PidFolder + "/" +*pid) if err != nil { log.Println(err) } }