package crawler_manager import ( //"git.loafle.net/overflow/overflow_probe/crawler/config" //"encoding/json" "google.golang.org/grpc" "io/ioutil" "log" "os" "os/exec" "strconv" "time" "errors" "git.loafle.net/overflow/overflow_probe/agent_api/config_manager" ) const ( address = "localhost:" portArgsName = "-Port=" defaultPort = 50000 //rootFolder = "/home/cm2/" //ConfigFolder = rootFolder + "/config/container/" //BinaryFolder = rootFolder + "/container/" //PidFolder = rootFolder + "/pids/" //runFile = "ttnc" ) var g_CrawlerMananger *CrawlerManager = nil type CrawlerManager struct { currentPort int portMap map[string]string pidMap map[string]string ConfigMgr config_manager.ConfigManager } func init() { GetInstance() listenEvent() //g_CrawlerMananger.init() } func SettingPath() { //test //GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder = "/home/cm2/" GetInstance().ConfigMgr.GetGlobalConfig().Paths.BinaryFolder = GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder + GetInstance().ConfigMgr.GetGlobalConfig().Paths.BinaryFolder GetInstance().ConfigMgr.GetGlobalConfig().Paths.ConfigFolder = GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder + GetInstance().ConfigMgr.GetGlobalConfig().Paths.ConfigFolder GetInstance().ConfigMgr.GetGlobalConfig().Paths.PidFolder = GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder + GetInstance().ConfigMgr.GetGlobalConfig().Paths.PidFolder } func GetInstance() *CrawlerManager { if g_CrawlerMananger == nil { g_CrawlerMananger = &CrawlerManager{portMap: make(map[string]string), pidMap: make(map[string]string), currentPort: defaultPort} } return g_CrawlerMananger } func (c *CrawlerManager) GetClient(container string) (*grpc.ClientConn, error) { b := c.checkContainer(&container) if b == false { err := c.runAndInitContainerOne(&container) if err != nil { return nil, err } } gport := c.portMap[container] return grpc.Dial(address+gport, grpc.WithInsecure()) } func (c *CrawlerManager) init() error { c.checkPid() cmap := c.ConfigMgr.GetSensors() scm := sortContainer(cmap) for ctn := range scm { err := c.runAndInitContainer(&ctn, scm[ctn]) if err != nil { return err } } return nil } func (c *CrawlerManager) checkContainer(container *string) bool { return callStatus(container) } func (c *CrawlerManager) checkAndRunContainer(container *string) bool { b := callStatus(container) if b == false { err := c.runAndInitContainerOne(container) if err != nil { return false } } return true } func (c *CrawlerManager) checkPid() { files, err := ioutil.ReadDir(c.ConfigMgr.GetGlobalConfig().Paths.PidFolder) if err != nil { log.Println(err) return } for _, file := range files { if file.IsDir() { continue } str := file.Name() c.stopProcess(&str) c.removeProcessFile(&str) } } func (c *CrawlerManager) runAndInitContainerOne(container *string) error { err := c.runContainer(container) if err != nil { return err } cmap := c.ConfigMgr.GetSensors() scm := sortContainer(cmap) b := callInitConfig(container, scm[*container]) if b == false { return errors.New("call init failed") } return nil } func (c *CrawlerManager) runAndInitContainer(container *string, cl []*config_manager.Config) error { err := c.runContainer(container) if err != nil { return err } b := callInitConfig(container, cl) if b == false { return errors.New("call init failed") } return nil } func sortContainer(cm map[string]*config_manager.Config) map[string][]*config_manager.Config { m := make(map[string][]*config_manager.Config) var cn string for key := range cm { cn = cm[key].Crawler.Container m[cn] = append(m[cn], cm[key]) } return m } func (c *CrawlerManager) runContainer(container *string) error { b := c.checkContainer(container) if b { return nil } cmdStr := getRunCommand(container) for { pArg := strconv.Itoa(c.currentPort) cmd := exec.Command(cmdStr, pArg) err := cmd.Start() if err != nil { //run error break; log.Println(err) return err } time.Sleep(time.Duration(time.Second * 2)) b := callStatusAddress(address + strconv.Itoa(c.currentPort)) if b == false { log.Println("false " + strconv.Itoa(c.currentPort)) c.currentPort++ continue } log.Println(*container+" run success port:", c.currentPort, "pid:", cmd.Process.Pid) writePid(cmd.Process.Pid) c.portMap[*container] = strconv.Itoa(c.currentPort) c.pidMap[*container] = strconv.Itoa(cmd.Process.Pid) c.currentPort++ log.Println(*container + "started") break } return nil } func (c *CrawlerManager) addSensor(id string) error { conf := c.ConfigMgr.GetSensorById(id) b := c.checkAndRunContainer(&conf.Crawler.Name) if b == false { return errors.New("run container error") } b = callAdd(&conf.Crawler.Container, conf) if b == false { return errors.New("Call Add Fail") } return nil } func (c *CrawlerManager) removeSensor(id string) { conf := c.ConfigMgr.GetSensorById(id) callRemove(&conf.Crawler.Name, conf) //remove and stop } func (c *CrawlerManager) updateSensor(id string) error { conf := c.ConfigMgr.GetSensorById(id) b := callInitConfigOne(&conf.Crawler.Container, conf) if b == false { return errors.New("update sensor error") } return nil } func (c *CrawlerManager) stopContainerAll() { for k, _ := range c.pidMap { c.stopContainer(&k) } } func (c *CrawlerManager) stopContainer(container *string) { pid := c.pidMap[*container] if len(pid) <= 0 { return } c.stopProcess(&pid) c.removeProcessFile(&pid) delete(c.pidMap, *container) delete(c.portMap, *container) } func (c *CrawlerManager) stopProcess(pid *string) { pidi, err := strconv.Atoi(*pid) if err != nil { log.Println(err) } p, err := os.FindProcess(pidi) if err != nil { log.Println(err) } p.Kill() } func (c *CrawlerManager) removeProcessFile(pid *string) { err := os.Remove(c.ConfigMgr.GetGlobalConfig().Paths.PidFolder + "/" + *pid) if err != nil { log.Println(err) } }