From c91b5df21f4b541d1a375781c3cc6427cafdb6ee Mon Sep 17 00:00:00 2001 From: snoop Date: Sat, 15 Apr 2017 20:10:33 +0900 Subject: [PATCH] incompleted cm --- crawler_communicator.go | 109 ++++++++--------- crawler_manager.go | 263 +++++++++++++++++++++++++++++++--------- crawler_manager_test.go | 70 +++++++---- crawler_runner.go | 55 --------- crawler_util.go | 43 +++++++ 5 files changed, 348 insertions(+), 192 deletions(-) delete mode 100644 crawler_runner.go create mode 100644 crawler_util.go diff --git a/crawler_communicator.go b/crawler_communicator.go index 09ebd9a..ff1fda7 100644 --- a/crawler_communicator.go +++ b/crawler_communicator.go @@ -7,16 +7,19 @@ import ( g "loafle.com/overflow/crawler_go/grpc" "context" - "encoding/json" + "strings" ) -func CallAdd() { +func callAdd(container *string, crawlerName *string, id *string) bool { - conn, err := grpc.Dial(address, grpc.WithInsecure()) + port := GetInstance().portMap[*container] + + conn, err := grpc.Dial(address+port, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) + return false } defer conn.Close() @@ -24,18 +27,28 @@ func CallAdd() { in := &g.Input{} - in.Id = "test_redis_sid" - in.Name = g.Crawlers_HEALTH_REDIS + in.Id = *id + in.Name = g.Crawlers(g.Crawlers_value[*crawlerName]) out, err := cc.Add(context.Background(), in) if err != nil { log.Println(err) } - log.Println(out) + log.Println("callAdd:", out) + + return true } -func CallInit2(address string, paths *[]string) bool { + +func callInit(container *string, paths *[]string) bool { + + port := GetInstance().portMap[*container] + + return callInitAddress(address + port, paths) +} + +func callInitAddress(address string, paths *[]string) bool { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) @@ -50,15 +63,13 @@ func CallInit2(address string, paths *[]string) bool { for _, path := range *paths { in := &g.Init{} - in.Path = "/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/" - //in.Path = path + //in.Path = "/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/" + in.Path = path + "/" bcn := filepath.Base(path) bcn = strings.ToUpper(bcn) - log.Println("cn:" + base+bcn) - - //in.Name = g.Crawlers(g.Crawlers_value[base+bcn]) - in.Name = g.Crawlers(g.Crawlers_value[g.Crawlers_HEALTH_REDIS.String()]) //test + in.Name = g.Crawlers(g.Crawlers_value[base+bcn]) + //in.Name = g.Crawlers(g.Crawlers_value[g.Crawlers_HEALTH_REDIS.String()]) //test inArr.In = append(inArr.In, in) } @@ -67,43 +78,18 @@ func CallInit2(address string, paths *[]string) bool { log.Println(errInit) return false } - log.Println(outInit) + log.Println("callInit:",outInit) return true } -func CallInit(crawlerName string, path string) { - port := portMap[crawlerName] +func callRemove(container *string, crawlerName *string, id *string) { + + port := GetInstance().portMap[*container] conn, err := grpc.Dial(address+port, grpc.WithInsecure()) - if err != nil { - log.Fatalf("did not connect: %v", err) - } - defer conn.Close() - cc := g.NewConfigClient(conn) - - in := &g.Init{} - - in.Path = filepath.Dir(path)+ "/" - - in.Name = g.Crawlers(g.Crawlers_value[crawlerName]) - - inArr := &g.InputArray{} - inArr.In = append(inArr.In, in) - - outInit, errInit := cc.Init(context.Background(), inArr) - if errInit != nil { - log.Println(errInit) - } - log.Println(outInit) - -} - -func CallRemove() { - - conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } @@ -112,36 +98,47 @@ func CallRemove() { cc := g.NewConfigClient(conn) inR := &g.Input{} - inR.Id = "123123" - inR.Name = g.Crawlers_HEALTH_DNS + inR.Id = *id + inR.Name = g.Crawlers(g.Crawlers_value[*crawlerName]) outRem, errRem := cc.Remove(context.Background(), inR) if errRem != nil { log.Println(errRem) } - log.Println(outRem) + log.Println("callRemove:",outRem) } -func CallGet() { - conn, err := grpc.Dial("localhost:50000", grpc.WithInsecure()) + +func callStatus(container string) bool { + + port := GetInstance().portMap[container] + + if port == "" { + return false + } + + return callStatusAddress(address +port) + +} + +func callStatusAddress(addr string) bool { + + conn, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() - c := g.NewDataClient(conn) - in := &g.Input{ - Name: g.Crawlers_HEALTH_REDIS, - Id: "test_redis_sid", - } + c := g.NewStatusClient(conn) + + e := &g.Empty{} + out, err := c.Status(context.Background(), e) - out, err := c.Get(context.Background(), in) if err != nil { log.Fatalf("could not greet: %v", err) } - var check bool - json.Unmarshal(out.Data, &check) - log.Println(check) + return out.Check + } \ No newline at end of file diff --git a/crawler_manager.go b/crawler_manager.go index e68ea35..cc7a48f 100644 --- a/crawler_manager.go +++ b/crawler_manager.go @@ -1,17 +1,17 @@ package crawler_manager import ( - - - - "loafle.com/overflow/crawler_go/config" - "encoding/json" - "io/ioutil" - "google.golang.org/grpc" + "strconv" + "os/exec" + "time" + "os" + "log" + + "github.com/kataras/go-errors" ) const ( @@ -19,41 +19,53 @@ const ( defaultPort = 50000 rootFolder = "/home/cm/" - ConfigFolder = rootFolder + "/config/" + ConfigFolder = rootFolder + "/config/" // +container BinaryFolder = rootFolder + "/container/" PidFolder = rootFolder + "/pids/" - runFile = "tnc" + runFile = "ttnc" ) +var g_CrawlerMananger *CrawlerManager = nil type CrawlerManager struct { - + currentPort int + portMap map[string]string + pidMap map[string]string } -var currentPort = defaultPort - -var pidMap map[string]int - -var portMap map[string]string - func init() { - pidMap = make(map[string]int) + if g_CrawlerMananger == nil { + g_CrawlerMananger = &CrawlerManager{portMap:make(map[string]string),pidMap:make(map[string]string), currentPort:defaultPort} + } - portMap = make(map[string]string) + //g_CrawlerMananger.init() +} +func GetInstance() *CrawlerManager { + return g_CrawlerMananger; } func (c *CrawlerManager)GetClient(container string,port string) (*grpc.ClientConn, error) { - port = portMap[container] + b := c.checkContainer(container) + if b == false { + //c.runContainer(&container) + //CallInit() + err := c.runAndInitContainer(&container) + if err != nil { + return nil, err + } + } - return grpc.Dial("localhost:"+port, grpc.WithInsecure()) + gport := c.portMap[container] + + return grpc.Dial(address+gport, grpc.WithInsecure()) } -func ReadConfig(path string ) *config.Config { +func readConfig(path string ) *config.Config { bytes, err := ioutil.ReadFile(path) @@ -61,33 +73,46 @@ func ReadConfig(path string ) *config.Config { return nil } - c := config.Config{} - - json.Unmarshal(bytes, &c) - - return &c + cc := config.Config{} + json.Unmarshal(bytes, &cc) + return &cc } -func InitContainer() { +func (c *CrawlerManager)init() { - cs := IsStartContainer() + c.checkPid() + + cs := c.isStartContainer() + + //var cpm map[string][]string = make(map[string][]string) + + //var ccl []string + for _, cc := range cs { + + err := c.runAndInitContainer(&cc) + + if err != nil { + log.Println(err) + continue + } + + //existConfigFileDir(ConfigFolder, cc, &ccl) + //cpm[cc] = ccl + //err := c.runContainer(&cc) + //if err != nil { + // callInit(cc, &ccl) + //} - var cpm map[string][]string = make(map[string][]string) - var ccl []string - for _, c := range cs { - ExistConfigFileDir(ConfigFolder, c, &ccl) - cpm[c] = ccl - RunContainer(&c, &cpm) } } -func IsStartContainer() []string { +func (c *CrawlerManager)isStartContainer() []string { files, _ := ioutil.ReadDir(ConfigFolder) @@ -96,7 +121,7 @@ func IsStartContainer() []string { for _,file := range files { if file.IsDir() { - b := ExistConfigFile(ConfigFolder, file.Name()) + b := existConfigFile(ConfigFolder, file.Name()) if b { cs = append(cs, file.Name()) } @@ -106,40 +131,158 @@ func IsStartContainer() []string { return cs } -func ExistConfigFile(prePath string,dir string) bool { - - files, _ := ioutil.ReadDir(prePath + "/" +dir) - - for _,file := range files { - - if file.IsDir() { - retB := ExistConfigFile(prePath + "/" + dir, file.Name()) - if retB { - return true - } - } else { - return true - } - - } - return false +func (c *CrawlerManager)checkContainer(container string) bool { + return callStatus(container) } -func ExistConfigFileDir(prePath string,dir string, configCrawler *[]string) { - files, _ := ioutil.ReadDir(prePath + "/" +dir) +func (c *CrawlerManager)checkPid() { - for _,file := range files { + files, err := ioutil.ReadDir(PidFolder) + + if err != nil { + log.Println(err) + return + } + + for _, file := range files { if file.IsDir() { - ExistConfigFileDir(prePath + "/" + dir, file.Name(), configCrawler) - } else { - *configCrawler = append(*configCrawler, prePath + "/" +dir) - return; + continue } + str := file.Name() + + c.stopProcess(&str) + c.removeProcessFile(&str) + } +} + + +func (c *CrawlerManager)runAndInitContainer(container *string) error { + + err := c.runContainer(container) + + if err != nil { + return err + } + + dirs := getConfigPaths(container) + + if dirs == nil { + return errors.New("not found config") + } + b := callInit(container, dirs) + if b == false { + return errors.New("call init failed") + } + + + return nil +} + +func (c *CrawlerManager)runContainer(container *string) error { + + //chekc is Run? + + cmdStr := getRunCommand(container) + + for { + pArg := "-Port=" + strconv.Itoa(c.currentPort) + cmd := exec.Command(cmdStr, pArg) + + err := cmd.Start() + if err != nil { + //run error break; + log.Println(err) + return err + } + + time.Sleep(time.Duration( time.Second * 2)) + + b := callStatusAddress(address + strconv.Itoa(c.currentPort)) + + if b == false { + c.currentPort++ + continue; + } + log.Println(*container + " run success port:" , c.currentPort , "pid:", cmd.Process.Pid ) + + writePid(cmd.Process.Pid) + + c.portMap[*container] = strconv.Itoa(c.currentPort) + c.pidMap[*container] = strconv.Itoa(cmd.Process.Pid) + + c.currentPort++ + + break; + } + + return nil +} + +func (c *CrawlerManager)stopContainerAll() { + + for k, _ := range c.pidMap { + c.stopContainer(&k) + } + +} + +func (c *CrawlerManager)stopContainer(container *string) { + + pid := c.pidMap[*container] + + if len(pid) <= 0 { + return + } + + c.stopProcess(&pid) + c.removeProcessFile(&pid) + + delete(c.pidMap, *container) + delete(c.portMap, *container) + +} + +func (c *CrawlerManager)stopProcess(pid *string) { + + pidi, err := strconv.Atoi(*pid) + + if err != nil { + log.Println(err) + } + + p, err := os.FindProcess(pidi) + if err != nil { + log.Println(err) + } + p.Kill() + } +func (c *CrawlerManager)removeProcessFile(pid *string) { + err := os.Remove(PidFolder + "/" +*pid) + if err != nil { + log.Println(err) + } +} + +func writePid(pid int) { + ioutil.WriteFile(PidFolder + strconv.Itoa(pid), []byte(""), os.ModePerm) +} + +func getConfigPaths(container *string) *[]string { + + var dirs []string + existConfigFileDir(ConfigFolder, *container, &dirs) + + return &dirs +} + + + + diff --git a/crawler_manager_test.go b/crawler_manager_test.go index c854038..670cb22 100644 --- a/crawler_manager_test.go +++ b/crawler_manager_test.go @@ -13,20 +13,13 @@ import ( "io/ioutil" ) -func TestAdd(t *testing.T) { - CallAdd(); - -} - func TestCallInit(t *testing.T) { //CallInit("") } -func TestCallRemove(t *testing.T) { - CallRemove() -} + func TestPid(t *testing.T) { @@ -106,8 +99,8 @@ func TestCom(t *testing.T) { func TestReadConfig(t *testing.T) { - c := ReadConfig("/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/example.json") - t.Log(c) + //c := ReadConfig("/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/example.json") + //t.Log(c) } @@ -126,9 +119,9 @@ func TestMyPid(t *testing.T) { func TestDir22(t *testing.T) { - cs := IsStartContainer() + //cs := IsStartContainer() - t.Log(cs) + //t.Log(cs) } @@ -180,19 +173,19 @@ func TestCrateDir(t *testing.T) { } func TestInit(t *testing.T) { - InitContainer() + //InitContainer() } func TestDirs(t *testing.T) { - cs := IsStartContainer() - - for _, c := range cs { - var ccl []string - ExistConfigFileDir(ConfigFolder, c, &ccl) - t.Log(ccl) - - } + //cs := IsStartContainer() + // + //for _, c := range cs { + // var ccl []string + // ExistConfigFileDir(ConfigFolder, c, &ccl) + // t.Log(ccl) + // + //} } @@ -208,4 +201,39 @@ func TestSplitPath(t *testing.T) { t.Log(a) +} + +func TestInitCM(t *testing.T) { + GetInstance().init() + + GetInstance().stopContainerAll() +} + +func TestCallAdd(t *testing.T) { + // + c := "java" + cn := "HEALTH_"+"MYSQL" + id := "id.json" + + GetInstance().runAndInitContainer(&c) + + callAdd(&c, &cn, &id) + + GetInstance().stopContainer(&c) + +} + +func TestCallRemove22(t *testing.T) { + + c := "java" + cn := "HEALTH_"+"MYSQL" + id := "id.json" + + GetInstance().runAndInitContainer(&c) + + callRemove(&c, &cn, &id) + + GetInstance().stopContainer(&c) + + } \ No newline at end of file diff --git a/crawler_runner.go b/crawler_runner.go deleted file mode 100644 index 60f7c54..0000000 --- a/crawler_runner.go +++ /dev/null @@ -1,55 +0,0 @@ -package crawler_manager - -import ( - - "log" - - "os/exec" - "time" - "strconv" - - "io/ioutil" - "os" -) - - -func RunContainer(container *string, cpm *map[string][]string) { - cmdStr := getRunCommand(container) - - for { - pArg := "-Port=" + strconv.Itoa(currentPort) - cmd := exec.Command(cmdStr, pArg) - - err := cmd.Start() - if err != nil { - log.Println(err) - } - - time.Sleep(time.Duration( time.Second * 2)) - - paths := (*cpm)[*container] - b := CallInit2("localhost:" + strconv.Itoa(currentPort), &paths) - log.Println("current Port:" , currentPort) - if b == false { - currentPort++ - continue; - } - log.Println(*container + " run success port:" , currentPort , "pid:", cmd.Process.Pid ) - //write pid file container_pid_port - ioutil.WriteFile(PidFolder+*container+"_"+strconv.Itoa(cmd.Process.Pid)+ "_"+strconv.Itoa(currentPort), []byte(""), os.ModePerm) - - portMap[*container] = strconv.Itoa(currentPort) - - currentPort++ - - break; - } - - - -} - -func getRunCommand(container *string ) string { - return BinaryFolder + "/" + *container + "/" + runFile -} - diff --git a/crawler_util.go b/crawler_util.go new file mode 100644 index 0000000..1e07b50 --- /dev/null +++ b/crawler_util.go @@ -0,0 +1,43 @@ +package crawler_manager + +import "io/ioutil" + +func existConfigFile(prePath string,dir string) bool { + + files, _ := ioutil.ReadDir(prePath + "/" +dir) + + for _,file := range files { + + if file.IsDir() { + retB := existConfigFile(prePath + "/" + dir, file.Name()) + if retB { + return true + } + } else { + return true + } + + } + return false +} + +func existConfigFileDir(prePath string,dir string, configCrawler *[]string) { + + files, _ := ioutil.ReadDir(prePath + "/" +dir) + + for _,file := range files { + + if file.IsDir() { + existConfigFileDir(prePath + "/" + dir, file.Name(), configCrawler) + } else { + *configCrawler = append(*configCrawler, prePath + "/" +dir) + return; + } + + } +} + +func getRunCommand(container *string ) string { + return BinaryFolder + "/" + *container + "/" + runFile +} +