This commit is contained in:
crusader 2018-03-15 22:52:23 +09:00
parent 4da07c417b
commit f073163bf7
17 changed files with 204 additions and 148 deletions

View File

@ -1,16 +1,46 @@
package client
package container
import (
"fmt"
"os/exec"
"path"
crc "git.loafle.net/commons_go/rpc/client"
crr "git.loafle.net/commons_go/rpc/registry"
ooccp "git.loafle.net/overflow/overflow_commons_go/config/probe"
oopcc "git.loafle.net/overflow/overflow_probe_container/client"
"git.loafle.net/overflow/overflow_probes/config"
)
func New(addr string, rpcInvoker crr.RPCInvoker) crc.Client {
ch := oopcc.NewClientHandler(rpcInvoker)
socketHandler := NewSocketHandler()
func New(port int, rpcInvoker crr.RPCInvoker) crc.Client {
addr := fmt.Sprintf("localhost:%d", port)
c := oopcc.New(addr, ch, socketHandler)
c := oopcc.New(addr, rpcInvoker)
return c
}
func GetContainerCommand(name string) (cmd *exec.Cmd, pidPath string) {
pidPath = path.Join(config.Config.Paths["root"], ooccp.PathPID, name+".pid")
switch name {
case ooccp.ContainerGeneralName:
javaPath := path.Join(config.Config.Paths["root"], ooccp.PathJRE, "bin", "java")
jarPath := path.Join(config.Config.Paths["root"], ooccp.PathBin, ooccp.ContainerGeneralFileName)
// arg := fmt.Sprintf("-jar %s %s", jarPath, pidPath)
cmd = exec.Command(javaPath, "-jar", jarPath, pidPath)
case ooccp.ContainerNetworkName:
exePath := path.Join(config.Config.Paths["root"], ooccp.PathBin, ooccp.ContainerNetworkFileName)
arg := fmt.Sprintf("-pid-path=%s", pidPath)
cmd = exec.Command(exePath, arg)
case ooccp.ContainerDiscoveryName:
exePath := path.Join(config.Config.Paths["root"], ooccp.PathBin, ooccp.ContainerDiscoveryFileName)
arg := fmt.Sprintf("-pid-path=%s", pidPath)
cmd = exec.Command(exePath, arg)
default:
}
return
}

View File

@ -1,30 +0,0 @@
package client
import (
crc "git.loafle.net/commons_go/rpc/client"
crr "git.loafle.net/commons_go/rpc/registry"
oopcc "git.loafle.net/overflow/overflow_probe_container/client"
)
type ClientHandlers struct {
oopcc.ClientHandler
}
func (ch *ClientHandlers) Init(clientCTX crc.ClientContext) error {
return ch.ClientHandler.Init(clientCTX)
}
func (ch *ClientHandlers) Destroy(clientCTX crc.ClientContext) {
ch.ClientHandler.Destroy(clientCTX)
}
func (ch *ClientHandlers) Validate() {
ch.ClientHandler.Validate()
}
func NewClientHandler(rpcInvoker crr.RPCInvoker) oopcc.ClientHandler {
ch := &ClientHandlers{}
ch.ClientHandler = oopcc.NewClientHandler(rpcInvoker)
return ch
}

View File

@ -1,26 +0,0 @@
package client
import (
"net"
csc "git.loafle.net/commons_go/server/client"
)
type SocketHandlers struct {
csc.SocketHandler
}
func (sh *SocketHandlers) OnConnect(socketContext csc.SocketContext, conn net.Conn) {
// no op
}
func (sh *SocketHandlers) OnDisconnect(soc csc.Socket) {
// no op
}
func (sh *SocketHandlers) Validate() {
}
func NewSocketHandler() csc.SocketHandler {
return &SocketHandlers{}
}

View File

@ -0,0 +1,22 @@
package annotation
// @Service()
// inherit @Component
import (
"reflect"
cda "git.loafle.net/commons_go/di/annotation"
cdia "git.loafle.net/commons_go/di/injection/annotation"
)
const (
ServiceTag = "@overFlow:Service"
)
func init() {
cda.RegisterAnnotation(ServiceTag, reflect.TypeOf((*Service)(nil)))
}
type Service struct {
cdia.Component
}

View File

@ -9,9 +9,6 @@
"key": "95d8bcdc739741dca74c4a0e489e0774"
},
"paths": {
"bin": "/bin",
"config": "/config",
"pid": "/pid",
"root": "/project/overFlow/probe"
}
}

View File

@ -7,3 +7,4 @@ import:
version: v2.17.08
- package: github.com/dgrijalva/jwt-go
version: v3.1.0
- package: git.loafle.net/commons_go/di

View File

@ -11,7 +11,7 @@ import (
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
oopccd "git.loafle.net/overflow/overflow_probes/client/central/data"
oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe"
"git.loafle.net/overflow/overflow_probes/service"
oopca "git.loafle.net/overflow/overflow_probes/commons/annotation"
)
func New() ProbeManager {
@ -45,23 +45,13 @@ func (pm *probeManagers) Start() error {
}
cdr.RegisterResource("CentralClients", centralClients)
centralService := service.GetService("CentralService").(*service.CentralService)
configService := service.GetService("ConfigService").(*service.ConfigService)
crawlerService := service.GetService("CrawlerService").(*service.CrawlerService)
discoveryService := service.GetService("DiscoveryService").(*service.DiscoveryService)
logService := service.GetService("LogService").(*service.LogService)
probeService := service.GetService("ProbeService").(*service.ProbeService)
sensorService := service.GetService("SensorService").(*service.SensorService)
services := cdr.GetInstancesByAnnotationName(oopca.ServiceTag)
probeRPCRegistry.RegisterService(centralService, "")
probeRPCRegistry.RegisterService(configService, "")
probeRPCRegistry.RegisterService(crawlerService, "")
probeRPCRegistry.RegisterService(discoveryService, "")
probeRPCRegistry.RegisterService(logService, "")
probeRPCRegistry.RegisterService(probeService, "")
probeRPCRegistry.RegisterService(sensorService, "")
for _, s := range services {
probeRPCRegistry.RegisterService(s, "")
}
logging.Logger().Debug(fmt.Sprintf("%v", centralService.CentralClients))
// logging.Logger().Debug(fmt.Sprintf("%v", centralService.CentralClients))
if err := centralProbeClient.Connect(); nil != err {
return err

View File

@ -9,10 +9,12 @@ import (
)
func init() {
cdr.RegisterType(reflect.TypeOf((*CentralService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*CentralService)(nil)))
}
type CentralService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
CentralClients map[string]oogwc.Client `annotation:"@Resource()"`
}

View File

@ -8,8 +8,10 @@ import (
)
func init() {
cdr.RegisterType(reflect.TypeOf((*ConfigService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*ConfigService)(nil)))
}
type ConfigService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
}

View File

@ -4,25 +4,23 @@ import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"reflect"
"strconv"
"time"
cda "git.loafle.net/commons_go/di/annotation"
cdr "git.loafle.net/commons_go/di/registry"
"git.loafle.net/commons_go/logging"
crc "git.loafle.net/commons_go/rpc/client"
oopcc "git.loafle.net/overflow/overflow_probe_container/client"
"git.loafle.net/overflow/overflow_probes/config"
"git.loafle.net/overflow/overflow_probes/client/container"
)
func init() {
cdr.RegisterType(reflect.TypeOf((*ContainerService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*ContainerService)(nil)))
}
type ContainerService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
clients map[string]*containerState
}
@ -71,33 +69,33 @@ func (cs *ContainerService) runProcess(name string) error {
return nil
}
cmdPath := path.Join(config.Config.Paths["root"], config.Config.Paths["bin"], name+".sh")
pidPath := path.Join(config.Config.Paths["root"], config.Config.Paths["pid"], name+".pid")
// cmd, pidPath := container.GetContainerCommand(name)
// if err := cmd.Start(); nil != err {
// logging.Logger().Error(fmt.Sprintf("Probe: To run container(%s) failed err %v", name, err))
// return err
// }
cmd := exec.Command(cmdPath, pidPath)
if err := cmd.Start(); nil != err {
logging.Logger().Error(fmt.Sprintf("Probe: To run container(%s) failed err %v", name, err))
return err
}
// port, err := watchPidFileCreate(pidPath, time.Duration(time.Second*2))
// if nil != err {
// return err
// }
port := 60000
port, err := watchPidFileCreate(pidPath, time.Duration(time.Second*2))
if nil != err {
return err
}
clientAddr := fmt.Sprintf("localhost:%d", port)
client := oopcc.New(clientAddr, nil)
client := container.New(port, nil)
if err := client.Connect(); nil != err {
cmd.Process.Kill()
// cmd.Process.Kill()
return err
}
cState := &containerState{
pid: cmd.Process.Pid,
pid: 29694,
port: port,
client: client,
}
if nil == cs.clients {
cs.clients = make(map[string]*containerState, 0)
}
cs.clients[name] = cState
return nil
@ -128,9 +126,12 @@ func (cs *ContainerService) killProcess(name string) error {
return err
}
if err = p.Kill(); nil != err {
if err = p.Signal(os.Interrupt); nil != err {
return err
}
// if err = p.Kill(); nil != err {
// return err
// }
return nil
}

View File

@ -0,0 +1,69 @@
package service
import (
"fmt"
"path"
"testing"
lfcc "git.loafle.net/commons_go/config"
crc "git.loafle.net/commons_go/rpc/client"
ooccp "git.loafle.net/overflow/overflow_commons_go/config/probe"
"git.loafle.net/overflow/overflow_probes/config"
"github.com/stretchr/testify/assert"
)
func initConfig() {
dir, err := lfcc.ABSPathify("../")
if nil != err {
panic("Config path is not valid")
}
config.ConfigDir = &dir
cfp := path.Join(*config.ConfigDir, ooccp.ConfigFileName)
config.ConfigFilePath = &cfp
conf := lfcc.New()
config.Config = &ooccp.Config{}
if err := conf.Load(config.Config, *config.ConfigFilePath); nil != err {
panic(fmt.Sprintf("Config is not valid: %v", err))
}
}
func TestGetClient(t *testing.T) {
initConfig()
cs := GetService("ContainerService").(*ContainerService)
var c crc.Client
var err error
// // General
// c, err = cs.GetClient(ooccp.ContainerGeneralName)
// assert.Nil(t, err)
// assert.NotNil(t, c)
// err = cs.killProcess(ooccp.ContainerGeneralName)
// assert.Nil(t, err)
// Network
c, err = cs.GetClient(ooccp.ContainerNetworkName)
assert.Nil(t, err)
assert.NotNil(t, c)
cs.killProcess(ooccp.ContainerNetworkName)
assert.Nil(t, err)
// // Discovery
// c, err = cs.GetClient(ooccp.ContainerDiscoveryName)
// assert.Nil(t, err)
// assert.NotNil(t, c)
// cs.killProcess(ooccp.ContainerDiscoveryName)
// assert.Nil(t, err)
}

View File

@ -9,10 +9,11 @@ import (
)
func init() {
cdr.RegisterType(reflect.TypeOf((*CrawlerService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*CrawlerService)(nil)))
}
type CrawlerService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
}
func (cs *CrawlerService) Install() error {

View File

@ -11,10 +11,12 @@ import (
)
func init() {
cdr.RegisterType(reflect.TypeOf((*DiscoveryService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*DiscoveryService)(nil)))
}
type DiscoveryService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
ContainerService *ContainerService `annotation:"@Inject()"`
CentralService *CentralService `annotation:"@Inject()"`
}

View File

@ -8,10 +8,11 @@ import (
)
func init() {
cdr.RegisterType(reflect.TypeOf((*LogService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*LogService)(nil)))
}
type LogService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
}
func (ls *LogService) Send() error {

View File

@ -8,10 +8,11 @@ import (
)
func init() {
cdr.RegisterType(reflect.TypeOf((*ProbeService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*ProbeService)(nil)))
}
type ProbeService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
}
func (ps *ProbeService) Start() error {

View File

@ -9,10 +9,11 @@ import (
)
func init() {
cdr.RegisterType(reflect.TypeOf((*SensorService)(nil)), &cda.ComponentAnnotation{})
cdr.RegisterType(reflect.TypeOf((*SensorService)(nil)))
}
type SensorService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
}
func (ss *SensorService) Start(id int64) error {

View File

@ -1,13 +1,5 @@
package service
import (
"fmt"
"reflect"
cdr "git.loafle.net/commons_go/di/registry"
"git.loafle.net/commons_go/logging"
)
func InitService() {
}
@ -15,34 +7,34 @@ func DestroyService() {
}
func GetService(name string) interface{} {
var t reflect.Type
switch name {
case "CentralService":
t = reflect.TypeOf((*CentralService)(nil))
case "ConfigService":
t = reflect.TypeOf((*ConfigService)(nil))
case "ContainerService":
t = reflect.TypeOf((*ContainerService)(nil))
case "CrawlerService":
t = reflect.TypeOf((*CrawlerService)(nil))
case "DiscoveryService":
t = reflect.TypeOf((*DiscoveryService)(nil))
case "LogService":
t = reflect.TypeOf((*LogService)(nil))
case "ProbeService":
t = reflect.TypeOf((*ProbeService)(nil))
case "SensorService":
t = reflect.TypeOf((*SensorService)(nil))
default:
logging.Logger().Panic(fmt.Sprintf("Probe: Service[%s] is not exist", name))
return nil
}
// func GetService(name string) interface{} {
// var t reflect.Type
// switch name {
// case "CentralService":
// t = reflect.TypeOf((*CentralService)(nil))
// case "ConfigService":
// t = reflect.TypeOf((*ConfigService)(nil))
// case "ContainerService":
// t = reflect.TypeOf((*ContainerService)(nil))
// case "CrawlerService":
// t = reflect.TypeOf((*CrawlerService)(nil))
// case "DiscoveryService":
// t = reflect.TypeOf((*DiscoveryService)(nil))
// case "LogService":
// t = reflect.TypeOf((*LogService)(nil))
// case "ProbeService":
// t = reflect.TypeOf((*ProbeService)(nil))
// case "SensorService":
// t = reflect.TypeOf((*SensorService)(nil))
// default:
// logging.Logger().Panic(fmt.Sprintf("Probe: Service[%s] is not exist", name))
// return nil
// }
i, err := cdr.GetInstance(t)
if nil != err {
logging.Logger().Panic(fmt.Sprintf("Probe: Getting Service[%s] is failed %v", name, err))
return nil
}
return i
}
// i, err := cdr.GetInstance(t)
// if nil != err {
// logging.Logger().Panic(fmt.Sprintf("Probe: Getting Service[%s] is failed %v", name, err))
// return nil
// }
// return i
// }