incompleted cm

This commit is contained in:
snoop 2017-04-15 20:10:33 +09:00
parent a68e9d5531
commit c91b5df21f
5 changed files with 348 additions and 192 deletions

View File

@ -7,16 +7,19 @@ import (
g "loafle.com/overflow/crawler_go/grpc" g "loafle.com/overflow/crawler_go/grpc"
"context" "context"
"encoding/json"
"strings" "strings"
) )
func CallAdd() { func callAdd(container *string, crawlerName *string, id *string) bool {
conn, err := grpc.Dial(address, grpc.WithInsecure()) port := GetInstance().portMap[*container]
conn, err := grpc.Dial(address+port, grpc.WithInsecure())
if err != nil { if err != nil {
log.Fatalf("did not connect: %v", err) log.Fatalf("did not connect: %v", err)
return false
} }
defer conn.Close() defer conn.Close()
@ -24,18 +27,28 @@ func CallAdd() {
in := &g.Input{} in := &g.Input{}
in.Id = "test_redis_sid" in.Id = *id
in.Name = g.Crawlers_HEALTH_REDIS in.Name = g.Crawlers(g.Crawlers_value[*crawlerName])
out, err := cc.Add(context.Background(), in) out, err := cc.Add(context.Background(), in)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
log.Println(out) log.Println("callAdd:", out)
return true
} }
func CallInit2(address string, paths *[]string) bool {
func callInit(container *string, paths *[]string) bool {
port := GetInstance().portMap[*container]
return callInitAddress(address + port, paths)
}
func callInitAddress(address string, paths *[]string) bool {
conn, err := grpc.Dial(address, grpc.WithInsecure()) conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil { if err != nil {
log.Fatalf("did not connect: %v", err) log.Fatalf("did not connect: %v", err)
@ -50,15 +63,13 @@ func CallInit2(address string, paths *[]string) bool {
for _, path := range *paths { for _, path := range *paths {
in := &g.Init{} in := &g.Init{}
in.Path = "/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/" //in.Path = "/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/"
//in.Path = path in.Path = path + "/"
bcn := filepath.Base(path) bcn := filepath.Base(path)
bcn = strings.ToUpper(bcn) bcn = strings.ToUpper(bcn)
log.Println("cn:" + base+bcn) in.Name = g.Crawlers(g.Crawlers_value[base+bcn])
//in.Name = g.Crawlers(g.Crawlers_value[g.Crawlers_HEALTH_REDIS.String()]) //test
//in.Name = g.Crawlers(g.Crawlers_value[base+bcn])
in.Name = g.Crawlers(g.Crawlers_value[g.Crawlers_HEALTH_REDIS.String()]) //test
inArr.In = append(inArr.In, in) inArr.In = append(inArr.In, in)
} }
@ -67,43 +78,18 @@ func CallInit2(address string, paths *[]string) bool {
log.Println(errInit) log.Println(errInit)
return false return false
} }
log.Println(outInit) log.Println("callInit:",outInit)
return true return true
} }
func CallInit(crawlerName string, path string) {
port := portMap[crawlerName] func callRemove(container *string, crawlerName *string, id *string) {
port := GetInstance().portMap[*container]
conn, err := grpc.Dial(address+port, grpc.WithInsecure()) conn, err := grpc.Dial(address+port, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
cc := g.NewConfigClient(conn)
in := &g.Init{}
in.Path = filepath.Dir(path)+ "/"
in.Name = g.Crawlers(g.Crawlers_value[crawlerName])
inArr := &g.InputArray{}
inArr.In = append(inArr.In, in)
outInit, errInit := cc.Init(context.Background(), inArr)
if errInit != nil {
log.Println(errInit)
}
log.Println(outInit)
}
func CallRemove() {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil { if err != nil {
log.Fatalf("did not connect: %v", err) log.Fatalf("did not connect: %v", err)
} }
@ -112,36 +98,47 @@ func CallRemove() {
cc := g.NewConfigClient(conn) cc := g.NewConfigClient(conn)
inR := &g.Input{} inR := &g.Input{}
inR.Id = "123123" inR.Id = *id
inR.Name = g.Crawlers_HEALTH_DNS inR.Name = g.Crawlers(g.Crawlers_value[*crawlerName])
outRem, errRem := cc.Remove(context.Background(), inR) outRem, errRem := cc.Remove(context.Background(), inR)
if errRem != nil { if errRem != nil {
log.Println(errRem) log.Println(errRem)
} }
log.Println(outRem) log.Println("callRemove:",outRem)
} }
func CallGet() {
conn, err := grpc.Dial("localhost:50000", grpc.WithInsecure()) func callStatus(container string) bool {
port := GetInstance().portMap[container]
if port == "" {
return false
}
return callStatusAddress(address +port)
}
func callStatusAddress(addr string) bool {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil { if err != nil {
log.Fatalf("did not connect: %v", err) log.Fatalf("did not connect: %v", err)
} }
defer conn.Close() defer conn.Close()
c := g.NewDataClient(conn)
in := &g.Input{ c := g.NewStatusClient(conn)
Name: g.Crawlers_HEALTH_REDIS,
Id: "test_redis_sid", e := &g.Empty{}
} out, err := c.Status(context.Background(), e)
out, err := c.Get(context.Background(), in)
if err != nil { if err != nil {
log.Fatalf("could not greet: %v", err) log.Fatalf("could not greet: %v", err)
} }
var check bool return out.Check
json.Unmarshal(out.Data, &check)
log.Println(check)
} }

View File

@ -1,17 +1,17 @@
package crawler_manager package crawler_manager
import ( import (
"loafle.com/overflow/crawler_go/config" "loafle.com/overflow/crawler_go/config"
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"google.golang.org/grpc" "google.golang.org/grpc"
"strconv"
"os/exec"
"time"
"os"
"log"
"github.com/kataras/go-errors"
) )
const ( const (
@ -19,41 +19,53 @@ const (
defaultPort = 50000 defaultPort = 50000
rootFolder = "/home/cm/" rootFolder = "/home/cm/"
ConfigFolder = rootFolder + "/config/" ConfigFolder = rootFolder + "/config/" // +container
BinaryFolder = rootFolder + "/container/" BinaryFolder = rootFolder + "/container/"
PidFolder = rootFolder + "/pids/" PidFolder = rootFolder + "/pids/"
runFile = "tnc" runFile = "ttnc"
) )
var g_CrawlerMananger *CrawlerManager = nil
type CrawlerManager struct { type CrawlerManager struct {
currentPort int
portMap map[string]string
pidMap map[string]string
} }
var currentPort = defaultPort
var pidMap map[string]int
var portMap map[string]string
func init() { func init() {
pidMap = make(map[string]int) if g_CrawlerMananger == nil {
g_CrawlerMananger = &CrawlerManager{portMap:make(map[string]string),pidMap:make(map[string]string), currentPort:defaultPort}
}
portMap = make(map[string]string) //g_CrawlerMananger.init()
}
func GetInstance() *CrawlerManager {
return g_CrawlerMananger;
} }
func (c *CrawlerManager)GetClient(container string,port string) (*grpc.ClientConn, error) { func (c *CrawlerManager)GetClient(container string,port string) (*grpc.ClientConn, error) {
port = portMap[container] b := c.checkContainer(container)
if b == false {
//c.runContainer(&container)
//CallInit()
err := c.runAndInitContainer(&container)
if err != nil {
return nil, err
}
}
return grpc.Dial("localhost:"+port, grpc.WithInsecure()) gport := c.portMap[container]
return grpc.Dial(address+gport, grpc.WithInsecure())
} }
func ReadConfig(path string ) *config.Config { func readConfig(path string ) *config.Config {
bytes, err := ioutil.ReadFile(path) bytes, err := ioutil.ReadFile(path)
@ -61,33 +73,46 @@ func ReadConfig(path string ) *config.Config {
return nil return nil
} }
c := config.Config{} cc := config.Config{}
json.Unmarshal(bytes, &c)
return &c
json.Unmarshal(bytes, &cc)
return &cc
} }
func InitContainer() { func (c *CrawlerManager)init() {
cs := IsStartContainer() c.checkPid()
cs := c.isStartContainer()
//var cpm map[string][]string = make(map[string][]string)
//var ccl []string
for _, cc := range cs {
err := c.runAndInitContainer(&cc)
if err != nil {
log.Println(err)
continue
}
//existConfigFileDir(ConfigFolder, cc, &ccl)
//cpm[cc] = ccl
//err := c.runContainer(&cc)
//if err != nil {
// callInit(cc, &ccl)
//}
var cpm map[string][]string = make(map[string][]string)
var ccl []string
for _, c := range cs {
ExistConfigFileDir(ConfigFolder, c, &ccl)
cpm[c] = ccl
RunContainer(&c, &cpm)
} }
} }
func IsStartContainer() []string { func (c *CrawlerManager)isStartContainer() []string {
files, _ := ioutil.ReadDir(ConfigFolder) files, _ := ioutil.ReadDir(ConfigFolder)
@ -96,7 +121,7 @@ func IsStartContainer() []string {
for _,file := range files { for _,file := range files {
if file.IsDir() { if file.IsDir() {
b := ExistConfigFile(ConfigFolder, file.Name()) b := existConfigFile(ConfigFolder, file.Name())
if b { if b {
cs = append(cs, file.Name()) cs = append(cs, file.Name())
} }
@ -106,40 +131,158 @@ func IsStartContainer() []string {
return cs return cs
} }
func ExistConfigFile(prePath string,dir string) bool { func (c *CrawlerManager)checkContainer(container string) bool {
return callStatus(container)
files, _ := ioutil.ReadDir(prePath + "/" +dir)
for _,file := range files {
if file.IsDir() {
retB := ExistConfigFile(prePath + "/" + dir, file.Name())
if retB {
return true
}
} else {
return true
}
}
return false
} }
func ExistConfigFileDir(prePath string,dir string, configCrawler *[]string) {
files, _ := ioutil.ReadDir(prePath + "/" +dir) func (c *CrawlerManager)checkPid() {
for _,file := range files { files, err := ioutil.ReadDir(PidFolder)
if err != nil {
log.Println(err)
return
}
for _, file := range files {
if file.IsDir() { if file.IsDir() {
ExistConfigFileDir(prePath + "/" + dir, file.Name(), configCrawler) continue
} else {
*configCrawler = append(*configCrawler, prePath + "/" +dir)
return;
} }
str := file.Name()
c.stopProcess(&str)
c.removeProcessFile(&str)
} }
}
func (c *CrawlerManager)runAndInitContainer(container *string) error {
err := c.runContainer(container)
if err != nil {
return err
}
dirs := getConfigPaths(container)
if dirs == nil {
return errors.New("not found config")
}
b := callInit(container, dirs)
if b == false {
return errors.New("call init failed")
}
return nil
}
func (c *CrawlerManager)runContainer(container *string) error {
//chekc is Run?
cmdStr := getRunCommand(container)
for {
pArg := "-Port=" + strconv.Itoa(c.currentPort)
cmd := exec.Command(cmdStr, pArg)
err := cmd.Start()
if err != nil {
//run error break;
log.Println(err)
return err
}
time.Sleep(time.Duration( time.Second * 2))
b := callStatusAddress(address + strconv.Itoa(c.currentPort))
if b == false {
c.currentPort++
continue;
}
log.Println(*container + " run success port:" , c.currentPort , "pid:", cmd.Process.Pid )
writePid(cmd.Process.Pid)
c.portMap[*container] = strconv.Itoa(c.currentPort)
c.pidMap[*container] = strconv.Itoa(cmd.Process.Pid)
c.currentPort++
break;
}
return nil
}
func (c *CrawlerManager)stopContainerAll() {
for k, _ := range c.pidMap {
c.stopContainer(&k)
}
}
func (c *CrawlerManager)stopContainer(container *string) {
pid := c.pidMap[*container]
if len(pid) <= 0 {
return
}
c.stopProcess(&pid)
c.removeProcessFile(&pid)
delete(c.pidMap, *container)
delete(c.portMap, *container)
}
func (c *CrawlerManager)stopProcess(pid *string) {
pidi, err := strconv.Atoi(*pid)
if err != nil {
log.Println(err)
}
p, err := os.FindProcess(pidi)
if err != nil {
log.Println(err)
}
p.Kill()
} }
func (c *CrawlerManager)removeProcessFile(pid *string) {
err := os.Remove(PidFolder + "/" +*pid)
if err != nil {
log.Println(err)
}
}
func writePid(pid int) {
ioutil.WriteFile(PidFolder + strconv.Itoa(pid), []byte(""), os.ModePerm)
}
func getConfigPaths(container *string) *[]string {
var dirs []string
existConfigFileDir(ConfigFolder, *container, &dirs)
return &dirs
}

View File

@ -13,20 +13,13 @@ import (
"io/ioutil" "io/ioutil"
) )
func TestAdd(t *testing.T) {
CallAdd();
}
func TestCallInit(t *testing.T) { func TestCallInit(t *testing.T) {
//CallInit("") //CallInit("")
} }
func TestCallRemove(t *testing.T) {
CallRemove()
}
func TestPid(t *testing.T) { func TestPid(t *testing.T) {
@ -106,8 +99,8 @@ func TestCom(t *testing.T) {
func TestReadConfig(t *testing.T) { func TestReadConfig(t *testing.T) {
c := ReadConfig("/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/example.json") //c := ReadConfig("/home/snoop/develop/path/go/src/loafle.com/overflow/crawler_go/config/example.json")
t.Log(c) //t.Log(c)
} }
@ -126,9 +119,9 @@ func TestMyPid(t *testing.T) {
func TestDir22(t *testing.T) { func TestDir22(t *testing.T) {
cs := IsStartContainer() //cs := IsStartContainer()
t.Log(cs) //t.Log(cs)
} }
@ -180,19 +173,19 @@ func TestCrateDir(t *testing.T) {
} }
func TestInit(t *testing.T) { func TestInit(t *testing.T) {
InitContainer() //InitContainer()
} }
func TestDirs(t *testing.T) { func TestDirs(t *testing.T) {
cs := IsStartContainer() //cs := IsStartContainer()
//
for _, c := range cs { //for _, c := range cs {
var ccl []string // var ccl []string
ExistConfigFileDir(ConfigFolder, c, &ccl) // ExistConfigFileDir(ConfigFolder, c, &ccl)
t.Log(ccl) // t.Log(ccl)
//
} //}
} }
@ -208,4 +201,39 @@ func TestSplitPath(t *testing.T) {
t.Log(a) t.Log(a)
}
func TestInitCM(t *testing.T) {
GetInstance().init()
GetInstance().stopContainerAll()
}
func TestCallAdd(t *testing.T) {
//
c := "java"
cn := "HEALTH_"+"MYSQL"
id := "id.json"
GetInstance().runAndInitContainer(&c)
callAdd(&c, &cn, &id)
GetInstance().stopContainer(&c)
}
func TestCallRemove22(t *testing.T) {
c := "java"
cn := "HEALTH_"+"MYSQL"
id := "id.json"
GetInstance().runAndInitContainer(&c)
callRemove(&c, &cn, &id)
GetInstance().stopContainer(&c)
} }

View File

@ -1,55 +0,0 @@
package crawler_manager
import (
"log"
"os/exec"
"time"
"strconv"
"io/ioutil"
"os"
)
func RunContainer(container *string, cpm *map[string][]string) {
cmdStr := getRunCommand(container)
for {
pArg := "-Port=" + strconv.Itoa(currentPort)
cmd := exec.Command(cmdStr, pArg)
err := cmd.Start()
if err != nil {
log.Println(err)
}
time.Sleep(time.Duration( time.Second * 2))
paths := (*cpm)[*container]
b := CallInit2("localhost:" + strconv.Itoa(currentPort), &paths)
log.Println("current Port:" , currentPort)
if b == false {
currentPort++
continue;
}
log.Println(*container + " run success port:" , currentPort , "pid:", cmd.Process.Pid )
//write pid file container_pid_port
ioutil.WriteFile(PidFolder+*container+"_"+strconv.Itoa(cmd.Process.Pid)+ "_"+strconv.Itoa(currentPort), []byte(""), os.ModePerm)
portMap[*container] = strconv.Itoa(currentPort)
currentPort++
break;
}
}
func getRunCommand(container *string ) string {
return BinaryFolder + "/" + *container + "/" + runFile
}

43
crawler_util.go Normal file
View File

@ -0,0 +1,43 @@
package crawler_manager
import "io/ioutil"
func existConfigFile(prePath string,dir string) bool {
files, _ := ioutil.ReadDir(prePath + "/" +dir)
for _,file := range files {
if file.IsDir() {
retB := existConfigFile(prePath + "/" + dir, file.Name())
if retB {
return true
}
} else {
return true
}
}
return false
}
func existConfigFileDir(prePath string,dir string, configCrawler *[]string) {
files, _ := ioutil.ReadDir(prePath + "/" +dir)
for _,file := range files {
if file.IsDir() {
existConfigFileDir(prePath + "/" + dir, file.Name(), configCrawler)
} else {
*configCrawler = append(*configCrawler, prePath + "/" +dir)
return;
}
}
}
func getRunCommand(container *string ) string {
return BinaryFolder + "/" + *container + "/" + runFile
}