changed logic

This commit is contained in:
snoop 2017-04-27 18:07:56 +09:00
parent ebe48565b3
commit b4d709693c
4 changed files with 230 additions and 112 deletions

View File

@ -9,6 +9,8 @@ import (
"strings" "strings"
"loafle.com/overflow/agent_api/config_manager"
"encoding/json"
) )
@ -41,6 +43,49 @@ func callAdd(container *string, crawlerName *string, id *string) bool {
} }
func callInitConfig(container *string, cl []*config_manager.Config) bool {
port := GetInstance().portMap[*container]
conn, err := grpc.Dial(address+port, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
return false
}
defer conn.Close()
cc := g.NewConfigClient(conn)
inArr := &g.InputArray{}
for _, conf := range cl {
in := &g.Init{}
in.Name = g.Crawlers(g.Crawlers_value[conf.Crawler.Name])
b, err := json.Marshal(conf)
if err != nil {
continue
}
in.Path = string(b)
inArr.In = append(inArr.In, in)
}
outInit, errInit := cc.Init(context.Background(), inArr)
if errInit != nil {
log.Println(errInit)
return false
}
log.Println("callInit:",outInit)
return true
}
func callInit(container *string, paths *[]string) bool { func callInit(container *string, paths *[]string) bool {
port := GetInstance().portMap[*container] port := GetInstance().portMap[*container]
@ -48,6 +93,9 @@ func callInit(container *string, paths *[]string) bool {
return callInitAddress(address + port, paths) return callInitAddress(address + port, paths)
} }
func callInitAddress(address string, paths *[]string) bool { func callInitAddress(address string, paths *[]string) bool {
conn, err := grpc.Dial(address, grpc.WithInsecure()) conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil { if err != nil {

View File

@ -1,8 +1,8 @@
package crawler_manager package crawler_manager
import ( import (
"loafle.com/overflow/crawler_go/config" //"loafle.com/overflow/crawler_go/config"
"encoding/json" //"encoding/json"
"io/ioutil" "io/ioutil"
"google.golang.org/grpc" "google.golang.org/grpc"
"strconv" "strconv"
@ -12,8 +12,9 @@ import (
"log" "log"
"path/filepath"
"errors" "errors"
"loafle.com/overflow/agent_api/config_manager"
) )
const ( const (
@ -22,7 +23,7 @@ const (
defaultPort = 50000 defaultPort = 50000
rootFolder = "/home/cm2/" rootFolder = "/home/cm2/"
ConfigFolder = rootFolder + "/config/container/" // +container ConfigFolder = rootFolder + "/config/container/"
BinaryFolder = rootFolder + "/container/" BinaryFolder = rootFolder + "/container/"
PidFolder = rootFolder + "/pids/" PidFolder = rootFolder + "/pids/"
runFile = "ttnc" runFile = "ttnc"
@ -34,23 +35,34 @@ type CrawlerManager struct {
currentPort int currentPort int
portMap map[string]string portMap map[string]string
pidMap map[string]string pidMap map[string]string
ConfigMgr config_manager.ConfigManager
} }
func init() { func init() {
GetInstance();
listenEvent()
//g_CrawlerMananger.init()
}
func GetInstance() *CrawlerManager {
if g_CrawlerMananger == nil { if g_CrawlerMananger == nil {
g_CrawlerMananger = &CrawlerManager{portMap:make(map[string]string),pidMap:make(map[string]string), currentPort:defaultPort} g_CrawlerMananger = &CrawlerManager{portMap:make(map[string]string),pidMap:make(map[string]string), currentPort:defaultPort}
} }
g_CrawlerMananger.init() return g_CrawlerMananger;
} }
func (c *CrawlerManager)GetClient(container string) (*grpc.ClientConn, error) { func (c *CrawlerManager)GetClient(container string) (*grpc.ClientConn, error) {
b := c.checkContainer(&container) b := c.checkContainer(&container)
if b == false { if b == false {
err := c.runAndInitContainer(&container) err := c.runAndInitContainerOne(&container)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -60,66 +72,23 @@ func (c *CrawlerManager)GetClient(container string) (*grpc.ClientConn, error) {
return grpc.Dial(address+gport, grpc.WithInsecure()) return grpc.Dial(address+gport, grpc.WithInsecure())
} }
func GetInstance() *CrawlerManager {
return g_CrawlerMananger;
}
func readConfig(path string ) *config.Config {
bytes, err := ioutil.ReadFile(path)
if err != nil {
return nil
}
cc := config.Config{}
json.Unmarshal(bytes, &cc)
return &cc
}
func (c *CrawlerManager)init() { func (c *CrawlerManager)init() {
c.checkPid() c.checkPid()
cs := c.isStartContainer() cmap := c.ConfigMgr.GetCrawlers()
scm := sortContainer(cmap)
for _, cc := range cs {
err := c.runAndInitContainer(&cc)
if err != nil {
log.Println(err)
continue
}
for ctn := range scm {
c.runAndInitContainer(&ctn, scm[ctn])
} }
} }
func (c *CrawlerManager)isStartContainer() []string {
files, _ := ioutil.ReadDir(ConfigFolder)
var cs []string
for _,file := range files {
if file.IsDir() {
b := existConfigFile(ConfigFolder, file.Name())
if b {
cs = append(cs, file.Name())
}
}
}
return cs
}
func (c *CrawlerManager)checkContainer(container *string) bool { func (c *CrawlerManager)checkContainer(container *string) bool {
return callStatus(container) return callStatus(container)
@ -150,8 +119,7 @@ func (c *CrawlerManager)checkPid() {
} }
func (c *CrawlerManager)runAndInitContainerOne(container *string) error {
func (c *CrawlerManager)runAndInitContainer(container *string) error {
err := c.runContainer(container) err := c.runContainer(container)
@ -159,12 +127,30 @@ func (c *CrawlerManager)runAndInitContainer(container *string) error {
return err return err
} }
dirs := getConfigPaths(container) cmap := c.ConfigMgr.GetCrawlers()
scm := sortContainer(cmap)
if dirs == nil { b := callInitConfig(container, scm[*container])
return errors.New("not found config")
if b == false {
return errors.New("call init failed")
} }
b := callInit(container, dirs)
return nil
}
func (c *CrawlerManager)runAndInitContainer(container *string,cl []*config_manager.Config) error {
err := c.runContainer(container)
if err != nil {
return err
}
b := callInitConfig(container, cl)
if b == false { if b == false {
return errors.New("call init failed") return errors.New("call init failed")
} }
@ -173,6 +159,20 @@ func (c *CrawlerManager)runAndInitContainer(container *string) error {
return nil return nil
} }
func sortContainer(cm map[string]*config_manager.Config) map[string][]*config_manager.Config {
m := make(map[string][]*config_manager.Config)
var cn string;
for key := range cm {
cn = cm[key].Crawler.Container
m[cn] = append(m[cn], cm[key])
}
return m
}
func (c *CrawlerManager)runContainer(container *string) error { func (c *CrawlerManager)runContainer(container *string) error {
b := c.checkContainer(container) b := c.checkContainer(container)
@ -264,23 +264,3 @@ func (c *CrawlerManager)removeProcessFile(pid *string) {
log.Println(err) log.Println(err)
} }
} }
func (c *CrawlerManager)activeCrawler(container *string) []string {
var dirs []string
existConfigFileDir(ConfigFolder,*container,&dirs)
if len(dirs) <= 0 {
return nil
}
var r []string
for _, path := range dirs {
r = append(r, filepath.Base(path))
}
return r
}

45
crawler_manager_event.go Normal file
View File

@ -0,0 +1,45 @@
package crawler_manager
import (
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/observer/messages"
"fmt"
)
func listenEvent() {
go listenConfigLoaded();
go listenAddSensor();
}
func listenConfigLoaded() {
ch := make(chan interface{}, 0)
observer.Add(messages.CONFIGMANAGER_LOADED, ch)
o := <-ch
cm := o.(config_manager.ConfigManager)
GetInstance().ConfigMgr = cm;
GetInstance().init();
}
func listenAddSensor() {
ch := make(chan interface{}, 0)
observer.Add(messages.ADD_SENSOR_1, ch);
o := <-ch
str := o.(string)
fmt.Println(str)
//GetInstance().Addconfig(str)
}

View File

@ -12,7 +12,9 @@ import (
"strings" "strings"
"io/ioutil" "io/ioutil"
"loafle.com/overflow/agent_api/config_manager"
"encoding/json"
"fmt"
) )
@ -218,39 +220,39 @@ func TestInitCM(t *testing.T) {
} }
func TestCallStatus(t *testing.T) { func TestCallStatus(t *testing.T) {
c := "java" //c := "java"
GetInstance().runAndInitContainer(&c) //GetInstance().runAndInitContainer(&c)
//
callStatus(&c) //callStatus(&c)
//
GetInstance().stopContainer(&c) //GetInstance().stopContainer(&c)
} }
func TestCallAdd(t *testing.T) { func TestCallAdd(t *testing.T) {
////
//c := "java"
//cn := "HEALTH_"+"MYSQL"
//id := "id.json"
// //
c := "java" //GetInstance().runAndInitContainer(&c)
cn := "HEALTH_"+"MYSQL" //
id := "id.json" //callAdd(&c, &cn, &id)
//
GetInstance().runAndInitContainer(&c) //GetInstance().stopContainer(&c)
callAdd(&c, &cn, &id)
GetInstance().stopContainer(&c)
} }
func TestCallRemove22(t *testing.T) { func TestCallRemove22(t *testing.T) {
c := "java" //c := "java"
cn := "HEALTH_"+"MYSQL" //cn := "HEALTH_"+"MYSQL"
id := "id.json" //id := "id.json"
//
GetInstance().runAndInitContainer(&c) //GetInstance().runAndInitContainer(&c)
//
callRemove(&c, &cn, &id) //callRemove(&c, &cn, &id)
//
GetInstance().stopContainer(&c) //GetInstance().stopContainer(&c)
} }
@ -268,14 +270,14 @@ func TestRunRun(t *testing.T) {
func TestActiveCrawler(t *testing.T) { func TestActiveCrawler(t *testing.T) {
c := "java" //c := "java"
GetInstance().runContainer(&c) //GetInstance().runContainer(&c)
//
aa := GetInstance().activeCrawler(&c) //aa := GetInstance().activeCrawler(&c)
//
t.Log(aa) //t.Log(aa)
//
GetInstance().stopContainer(&c) //GetInstance().stopContainer(&c)
} }
@ -289,4 +291,47 @@ func TestErer(t *testing.T) {
} }
func TestStartContainer(t *testing.T) {
cs := GetStartContainer()
t.Log(cs)
}
func GetStartContainer() []string {
files, _ := ioutil.ReadDir(ConfigFolder)
var cs []string
for _,file := range files {
if file.IsDir() {
b := existConfigFile(ConfigFolder, file.Name())
if b {
cs = append(cs, file.Name())
}
}
}
return cs
}
func TestJson(t *testing.T) {
c := config_manager.Config{}
c.Crawler.Name = "wmi_crawler"
c.Crawler.Container = "java_proxy";
c.Id = "WMI_TEST_ID_001"
b, _ := json.Marshal(c)
fmt.Println(string(b))
}