overflow_probe/crawler_manager/crawler_manager.go
2017-08-04 11:32:31 +09:00

328 lines
6.0 KiB
Go

package crawler_manager
import (
//"git.loafle.net/overflow/overflow_probe/crawler/config"
//"encoding/json"
"google.golang.org/grpc"
"io/ioutil"
"log"
"os"
"os/exec"
"strconv"
"time"
"errors"
"git.loafle.net/overflow/overflow_probe/agent_api/config_manager"
)
const (
address = "localhost:"
portArgsName = "-Port="
defaultPort = 50000
//rootFolder = "/home/cm2/"
//ConfigFolder = rootFolder + "/config/container/"
//BinaryFolder = rootFolder + "/container/"
//PidFolder = rootFolder + "/pids/"
//runFile = "ttnc"
)
var g_CrawlerMananger *CrawlerManager = nil
type CrawlerManager struct {
currentPort int
portMap map[string]string
pidMap map[string]string
ConfigMgr config_manager.ConfigManager
}
func init() {
GetInstance()
listenEvent()
//g_CrawlerMananger.init()
}
func SettingPath() {
//test
//GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder = "/home/cm2/"
GetInstance().ConfigMgr.GetGlobalConfig().Paths.BinaryFolder = GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder + GetInstance().ConfigMgr.GetGlobalConfig().Paths.BinaryFolder
GetInstance().ConfigMgr.GetGlobalConfig().Paths.ConfigFolder = GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder + GetInstance().ConfigMgr.GetGlobalConfig().Paths.ConfigFolder
GetInstance().ConfigMgr.GetGlobalConfig().Paths.PidFolder = GetInstance().ConfigMgr.GetGlobalConfig().Paths.RootFolder + GetInstance().ConfigMgr.GetGlobalConfig().Paths.PidFolder
}
func GetInstance() *CrawlerManager {
if g_CrawlerMananger == nil {
g_CrawlerMananger = &CrawlerManager{portMap: make(map[string]string), pidMap: make(map[string]string), currentPort: defaultPort}
}
return g_CrawlerMananger
}
func (c *CrawlerManager) GetClient(container string) (*grpc.ClientConn, error) {
b := c.checkContainer(&container)
if b == false {
err := c.runAndInitContainerOne(&container)
if err != nil {
return nil, err
}
}
gport := c.portMap[container]
return grpc.Dial(address+gport, grpc.WithInsecure())
}
func (c *CrawlerManager) init() error {
c.checkPid()
cmap := c.ConfigMgr.GetSensors()
scm := sortContainer(cmap)
for ctn := range scm {
err := c.runAndInitContainer(&ctn, scm[ctn])
if err != nil {
return err
}
}
return nil
}
func (c *CrawlerManager) checkContainer(container *string) bool {
return callStatus(container)
}
func (c *CrawlerManager) checkAndRunContainer(container *string) bool {
b := callStatus(container)
if b == false {
err := c.runAndInitContainerOne(container)
if err != nil {
return false
}
}
return true
}
func (c *CrawlerManager) checkPid() {
files, err := ioutil.ReadDir(c.ConfigMgr.GetGlobalConfig().Paths.PidFolder)
if err != nil {
log.Println(err)
return
}
for _, file := range files {
if file.IsDir() {
continue
}
str := file.Name()
c.stopProcess(&str)
c.removeProcessFile(&str)
}
}
func (c *CrawlerManager) runAndInitContainerOne(container *string) error {
err := c.runContainer(container)
if err != nil {
return err
}
cmap := c.ConfigMgr.GetSensors()
scm := sortContainer(cmap)
b := callInitConfig(container, scm[*container])
if b == false {
return errors.New("call init failed")
}
return nil
}
func (c *CrawlerManager) runAndInitContainer(container *string, cl []*config_manager.Config) error {
err := c.runContainer(container)
if err != nil {
return err
}
b := callInitConfig(container, cl)
if b == false {
return errors.New("call init failed")
}
return nil
}
func sortContainer(cm map[string]*config_manager.Config) map[string][]*config_manager.Config {
m := make(map[string][]*config_manager.Config)
var cn string
for key := range cm {
cn = cm[key].Crawler.Container
m[cn] = append(m[cn], cm[key])
}
return m
}
func (c *CrawlerManager) runContainer(container *string) error {
b := c.checkContainer(container)
if b {
return nil
}
cmdStr := getRunCommand(container)
for {
pArg := strconv.Itoa(c.currentPort)
cmd := exec.Command(cmdStr, pArg)
err := cmd.Start()
if err != nil {
//run error break;
log.Println(err)
return err
}
time.Sleep(time.Duration(time.Second * 2))
b := callStatusAddress(address + strconv.Itoa(c.currentPort))
if b == false {
log.Println("false " + strconv.Itoa(c.currentPort))
c.currentPort++
continue
}
log.Println(*container+" run success port:", c.currentPort, "pid:", cmd.Process.Pid)
writePid(cmd.Process.Pid)
c.portMap[*container] = strconv.Itoa(c.currentPort)
c.pidMap[*container] = strconv.Itoa(cmd.Process.Pid)
c.currentPort++
log.Println(*container + "started")
break
}
return nil
}
func (c *CrawlerManager) addSensor(id string) error {
conf := c.ConfigMgr.GetSensorById(id)
b := c.checkAndRunContainer(&conf.Crawler.Name)
if b == false {
return errors.New("run container error")
}
b = callAdd(&conf.Crawler.Container, conf)
if b == false {
return errors.New("Call Add Fail")
}
return nil
}
func (c *CrawlerManager) removeSensor(id string) {
conf := c.ConfigMgr.GetSensorById(id)
callRemove(&conf.Crawler.Name, conf)
//remove and stop
}
func (c *CrawlerManager) updateSensor(id string) error {
conf := c.ConfigMgr.GetSensorById(id)
b := callInitConfigOne(&conf.Crawler.Container, conf)
if b == false {
return errors.New("update sensor error")
}
return nil
}
func (c *CrawlerManager) stopContainerAll() {
for k, _ := range c.pidMap {
c.stopContainer(&k)
}
}
func (c *CrawlerManager) stopContainer(container *string) {
pid := c.pidMap[*container]
if len(pid) <= 0 {
return
}
c.stopProcess(&pid)
c.removeProcessFile(&pid)
delete(c.pidMap, *container)
delete(c.portMap, *container)
}
func (c *CrawlerManager) stopProcess(pid *string) {
pidi, err := strconv.Atoi(*pid)
if err != nil {
log.Println(err)
}
p, err := os.FindProcess(pidi)
if err != nil {
log.Println(err)
}
p.Kill()
}
func (c *CrawlerManager) removeProcessFile(pid *string) {
err := os.Remove(c.ConfigMgr.GetGlobalConfig().Paths.PidFolder + "/" + *pid)
if err != nil {
log.Println(err)
}
}