351 lines
8.9 KiB
Go
351 lines
8.9 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"os/exec"
|
|
"reflect"
|
|
"strconv"
|
|
"time"
|
|
|
|
cda "git.loafle.net/commons/di-go/annotation"
|
|
cdr "git.loafle.net/commons/di-go/registry"
|
|
"git.loafle.net/commons/logging-go"
|
|
crc "git.loafle.net/commons/rpc-go/client"
|
|
csc "git.loafle.net/commons/server-go/client"
|
|
occp "git.loafle.net/overflow/commons-go/config/probe"
|
|
ocsp "git.loafle.net/overflow/commons-go/service/probe"
|
|
"git.loafle.net/overflow/probe/client/container"
|
|
"git.loafle.net/overflow/probe/config"
|
|
|
|
// For annotation
|
|
_ "git.loafle.net/overflow/commons-go/core/annotation"
|
|
)
|
|
|
|
var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil))
|
|
|
|
func init() {
|
|
cdr.RegisterType(ContainerServiceType)
|
|
}
|
|
|
|
type ContainerService struct {
|
|
ocsp.ContainerService
|
|
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
|
|
|
|
DiscoveryService *DiscoveryService `annotation:"@Inject()"`
|
|
|
|
rpcServiceMap map[occp.ContainerType][]interface{}
|
|
containerStates map[occp.ContainerType]*containerState
|
|
connectorMap map[csc.Connector]*containerState
|
|
}
|
|
|
|
func (s *ContainerService) InitService() error {
|
|
s.containerStates = make(map[occp.ContainerType]*containerState)
|
|
s.rpcServiceMap = make(map[occp.ContainerType][]interface{})
|
|
s.connectorMap = make(map[csc.Connector]*containerState)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ContainerService) StartService() error {
|
|
s.rpcServiceMap[occp.ContainerDiscovery] = []interface{}{
|
|
s.DiscoveryService,
|
|
}
|
|
s.rpcServiceMap[occp.ContainerNetwork] = []interface{}{}
|
|
s.rpcServiceMap[occp.ContainerGenernal] = []interface{}{}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ContainerService) StopService() {
|
|
for containerType := range s.containerStates {
|
|
s.removeContainerState(containerType)
|
|
}
|
|
}
|
|
|
|
func (s *ContainerService) DestroyService() {
|
|
|
|
}
|
|
|
|
func (s *ContainerService) Call(containerType occp.ContainerType, result interface{}, method string, params ...interface{}) error {
|
|
client, err := s.getClient(containerType)
|
|
if nil != err {
|
|
return err
|
|
}
|
|
return client.Call(result, method, params...)
|
|
}
|
|
|
|
func (s *ContainerService) Send(containerType occp.ContainerType, method string, params ...interface{}) error {
|
|
client, err := s.getClient(containerType)
|
|
if nil != err {
|
|
return err
|
|
}
|
|
return client.Send(method, params...)
|
|
}
|
|
|
|
func (s *ContainerService) getClient(containerType occp.ContainerType) (*crc.Client, error) {
|
|
cs := s.checkContainer(containerType)
|
|
if nil == cs {
|
|
_cs, err := s.runContainer(containerType)
|
|
//_cs, err := s.debugContainer(containerType)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
cs = _cs
|
|
s.containerStates[containerType] = cs
|
|
}
|
|
|
|
if nil == cs.client {
|
|
client, err := container.NewClient(containerType, cs.port, s.rpcServiceMap[containerType])
|
|
if nil != err {
|
|
s.removeContainerState(containerType)
|
|
return nil, err
|
|
}
|
|
if err := client.Start(); nil != err {
|
|
s.removeContainerState(containerType)
|
|
return nil, err
|
|
}
|
|
cs.client = client
|
|
cs.client.Connector.SetOnDisconnected(s.onDisconnected)
|
|
s.connectorMap[cs.client.Connector] = cs
|
|
}
|
|
|
|
return cs.client, nil
|
|
}
|
|
|
|
func (s *ContainerService) onDisconnected(connector csc.Connector) {
|
|
cs, ok := s.connectorMap[connector]
|
|
if !ok || nil == cs {
|
|
return
|
|
}
|
|
s.refreshContainer(cs.containerType)
|
|
}
|
|
|
|
func (s *ContainerService) runContainer(containerType occp.ContainerType) (*containerState, error) {
|
|
if cs := s.checkContainer(containerType); nil != cs {
|
|
return cs, nil
|
|
}
|
|
|
|
cmd, pidFilePath := cotainerCommand(containerType)
|
|
removePidFile(pidFilePath)
|
|
|
|
if err := cmd.Start(); nil != err {
|
|
logging.Logger().Errorf("to run Container[%s] failed err %v", containerType.String(), err)
|
|
return nil, err
|
|
}
|
|
|
|
port, err := watchPidFileCreate(pidFilePath, time.Duration(time.Second*2))
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
|
|
// go func(containerType occp.ContainerType, cmd *exec.Cmd) {
|
|
// if err := cmd.Wait(); nil != err {
|
|
// logging.Logger().Error(err)
|
|
// }
|
|
// logging.Logger().Infof("Container[%s] has been stopped", containerType.String())
|
|
// s.refreshContainer(containerType)
|
|
// }(containerType, cmd)
|
|
|
|
cs := &containerState{
|
|
containerType: containerType,
|
|
cmd: cmd,
|
|
port: port,
|
|
}
|
|
|
|
return cs, nil
|
|
}
|
|
|
|
func (s *ContainerService) refreshContainer(containerType occp.ContainerType) {
|
|
cs := s.checkContainer(containerType)
|
|
if nil == cs {
|
|
if _, err := s.getClient(containerType); nil != err {
|
|
logging.Logger().Error(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
delete(s.connectorMap, cs.client.Connector)
|
|
err := cs.client.Stop(context.Background())
|
|
if nil != err {
|
|
logging.Logger().Error(err)
|
|
}
|
|
cs.client = nil
|
|
delete(s.containerStates, containerType)
|
|
if _, err := s.getClient(containerType); nil != err {
|
|
logging.Logger().Error(err)
|
|
}
|
|
//
|
|
//if cs, ok := s.checkContainer(containerType); nil == cs {
|
|
// cs, ok := s.containerStates[containerType]
|
|
// if !ok {
|
|
// return
|
|
// }
|
|
// delete(s.connectorMap, cs.client.Connector)
|
|
//
|
|
// logging.Logger().Debugf("Client[%s]11 has been disconnected", cs.containerType.String())
|
|
// err := cs.client.Stop(context.Background())
|
|
// if nil != err {
|
|
// logging.Logger().Error(err)
|
|
// }
|
|
//
|
|
// client, err := container.NewClient(cs.containerType, cs.port, s.rpcServiceMap[cs.containerType])
|
|
// if nil != err {
|
|
// s.removeContainerState(cs.containerType)
|
|
// logging.Logger().Error(err)
|
|
// return
|
|
// }
|
|
// if err := client.Start(); nil != err {
|
|
// s.removeContainerState(cs.containerType)
|
|
// logging.Logger().Error(err)
|
|
// return
|
|
// }
|
|
// cs.client = client
|
|
// cs.client.Connector.SetOnDisconnected(s.onDisconnected)
|
|
// s.connectorMap[cs.client.Connector] = cs
|
|
//} else {
|
|
// cs, ok := s.containerStates[containerType]
|
|
// if !ok {
|
|
// return
|
|
// }
|
|
// delete(s.connectorMap, cs.client.Connector)
|
|
// err := cs.client.Stop(context.Background())
|
|
// if nil != err {
|
|
// logging.Logger().Error(err)
|
|
// }
|
|
// delete(s.containerStates, containerType)
|
|
// _, err = s.getClient(containerType)
|
|
// if nil != err {
|
|
// logging.Logger().Error(err)
|
|
// }
|
|
//}
|
|
}
|
|
|
|
func (s *ContainerService) debugContainer(containerType occp.ContainerType) (*containerState, error) {
|
|
cs := &containerState{
|
|
containerType: containerType,
|
|
cmd: nil,
|
|
port: 60000,
|
|
}
|
|
|
|
return cs, nil
|
|
}
|
|
|
|
func (s *ContainerService) checkContainer(containerType occp.ContainerType) *containerState {
|
|
cs, ok := s.containerStates[containerType]
|
|
if !ok || nil == cs {
|
|
return nil
|
|
}
|
|
|
|
// p, err := os.FindProcess(cs.cmd.Process.Pid)
|
|
// if nil != err {
|
|
// s.removeContainerState(containerType)
|
|
// return nil
|
|
// }
|
|
// if nil == p {
|
|
// s.removeContainerState(containerType)
|
|
// return nil
|
|
// }
|
|
|
|
// if nil != cs.cmd.ProcessState && cs.cmd.ProcessState.Exited() {
|
|
// s.removeContainerState(containerType)
|
|
// return nil
|
|
// }
|
|
|
|
return cs
|
|
}
|
|
|
|
func (s *ContainerService) killContainer(containerType occp.ContainerType) error {
|
|
cs, ok := s.containerStates[containerType]
|
|
if !ok || nil == cs {
|
|
return nil
|
|
}
|
|
|
|
if nil == cs.cmd.ProcessState || !cs.cmd.ProcessState.Exited() {
|
|
if err := cs.cmd.Process.Kill(); nil != err {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ContainerService) removeContainerState(containerType occp.ContainerType) {
|
|
cs, ok := s.containerStates[containerType]
|
|
if !ok || nil == cs {
|
|
return
|
|
}
|
|
|
|
if nil != cs.client {
|
|
delete(s.connectorMap, cs.client.Connector)
|
|
cs.client.Stop(context.Background())
|
|
}
|
|
s.killContainer(containerType)
|
|
delete(s.containerStates, containerType)
|
|
}
|
|
|
|
func cotainerCommand(containerType occp.ContainerType) (cmd *exec.Cmd, pidFilePath string) {
|
|
pidFilePath = config.ContainerPIDFilePath(containerType)
|
|
loggingConfigFilePath := config.ContainerLoggingConfigFilePath(containerType)
|
|
binFilePath := config.ContainerBinFilePath(containerType)
|
|
|
|
switch containerType {
|
|
case occp.ContainerDiscovery, occp.ContainerNetwork:
|
|
args := []string{
|
|
fmt.Sprintf("-%s=%s", occp.FlagPidFilePathName, pidFilePath),
|
|
fmt.Sprintf("-%s=%s", occp.FlagLoggingConfigFilePathName, loggingConfigFilePath),
|
|
}
|
|
cmd = exec.Command(binFilePath, args...)
|
|
case occp.ContainerGenernal:
|
|
args := []string{
|
|
"-jar",
|
|
binFilePath,
|
|
pidFilePath,
|
|
loggingConfigFilePath,
|
|
}
|
|
|
|
cmd = exec.Command(config.JavaBinPath(), args...)
|
|
}
|
|
return
|
|
}
|
|
|
|
func removePidFile(pidFilePath string) {
|
|
if _, err := os.Stat(pidFilePath); err == nil {
|
|
if err := os.Remove(pidFilePath); nil != err {
|
|
logging.Logger().Errorf("removing pid file has been failed [%v]", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchPidFileCreate(pidFilePath string, waitTime time.Duration) (int, error) {
|
|
startTime := time.Now()
|
|
|
|
for {
|
|
if _, err := os.Stat(pidFilePath); err == nil {
|
|
buf, err := ioutil.ReadFile(pidFilePath)
|
|
if nil != err {
|
|
return 0, err
|
|
}
|
|
portNumber, err := strconv.ParseInt(string(buf), 10, 32)
|
|
if nil != err {
|
|
return 0, err
|
|
}
|
|
return int(portNumber), nil
|
|
}
|
|
|
|
if time.Since(startTime) > waitTime {
|
|
return 0, fmt.Errorf("pid file not exist")
|
|
}
|
|
|
|
time.Sleep(time.Duration(time.Millisecond * 100))
|
|
}
|
|
}
|
|
|
|
type containerState struct {
|
|
containerType occp.ContainerType
|
|
cmd *exec.Cmd
|
|
port int
|
|
client *crc.Client
|
|
}
|