probe/service/ContainerService.go
crusader d2d34699aa ing
2018-04-18 23:56:13 +09:00

256 lines
6.4 KiB
Go

package service
import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"reflect"
"strconv"
"time"
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
"git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client"
csc "git.loafle.net/commons/server-go/client"
ocpcc "git.loafle.net/overflow/commons-go/probe/constants"
"git.loafle.net/overflow/probe/client/container"
"git.loafle.net/overflow/probe/config"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
)
var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil))
func init() {
cdr.RegisterType(ContainerServiceType)
}
type ContainerService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
discoveryService *DiscoveryService
rpcServiceMap map[ocpcc.ContainerType][]interface{}
containerStates map[ocpcc.ContainerType]*containerState
connectorMap map[csc.Connector]*containerState
}
func (s *ContainerService) InitService() error {
s.containerStates = make(map[ocpcc.ContainerType]*containerState)
s.rpcServiceMap = make(map[ocpcc.ContainerType][]interface{})
s.connectorMap = make(map[csc.Connector]*containerState)
return nil
}
func (s *ContainerService) StartService() error {
s.rpcServiceMap[ocpcc.ContainerDiscovery] = []interface{}{
s.discoveryService,
}
s.rpcServiceMap[ocpcc.ContainerNetwork] = []interface{}{}
s.rpcServiceMap[ocpcc.ContainerGenernal] = []interface{}{}
return nil
}
func (s *ContainerService) StopService() {
for containerType := range s.containerStates {
s.removeContainerState(containerType)
}
}
func (s *ContainerService) DestroyService() {
}
func (s *ContainerService) Call(containerType ocpcc.ContainerType, result interface{}, method string, params ...interface{}) error {
client, err := s.getClient(containerType)
if nil != err {
return err
}
return client.Call(result, method, params...)
}
func (s *ContainerService) Send(containerType ocpcc.ContainerType, method string, params ...interface{}) error {
client, err := s.getClient(containerType)
if nil != err {
return err
}
return client.Send(method, params...)
}
func (s *ContainerService) getClient(containerType ocpcc.ContainerType) (*crc.Client, error) {
cs := s.checkContainer(containerType)
if nil == cs {
_cs, err := s.runContainer(containerType)
if nil != err {
return nil, err
}
cs = _cs
s.containerStates[containerType] = cs
}
if nil == cs.client {
client, err := container.NewClient(containerType, cs.port, s.rpcServiceMap[containerType])
if nil != err {
s.removeContainerState(containerType)
return nil, err
}
cs.client = client
cs.client.Connector.SetOnDisconnected(s.onDisconnected)
s.connectorMap[cs.client.Connector] = cs
}
return cs.client, nil
}
func (s *ContainerService) onDisconnected(connector csc.Connector) {
cs, ok := s.connectorMap[connector]
if !ok || nil == cs {
return
}
logging.Logger().Debugf("Client[%s] has been disconnected", cs.containerType.String())
}
func (s *ContainerService) runContainer(containerType ocpcc.ContainerType) (*containerState, error) {
if cs := s.checkContainer(containerType); nil != cs {
return cs, nil
}
cmd, pidFilePath := cotainerCommand(containerType)
removePidFile(pidFilePath)
if err := cmd.Start(); nil != err {
logging.Logger().Errorf("to run Container[%s] failed err %v", containerType.String(), err)
return nil, err
}
port, err := watchPidFileCreate(pidFilePath, time.Duration(time.Second*2))
if nil != err {
return nil, err
}
go func(containerType ocpcc.ContainerType, cmd *exec.Cmd) {
if err := cmd.Wait(); nil != err {
logging.Logger().Error(err)
}
logging.Logger().Infof("Container[%s] has been stopped", containerType.String())
}(containerType, cmd)
cs := &containerState{
containerType: containerType,
cmd: cmd,
port: port,
}
return cs, nil
}
func (s *ContainerService) checkContainer(containerType ocpcc.ContainerType) *containerState {
cs, ok := s.containerStates[containerType]
if !ok || nil == cs {
return nil
}
if nil != cs.cmd.ProcessState && cs.cmd.ProcessState.Exited() {
s.removeContainerState(containerType)
return nil
}
return cs
}
func (s *ContainerService) killContainer(containerType ocpcc.ContainerType) error {
cs, ok := s.containerStates[containerType]
if !ok || nil == cs {
return nil
}
if nil == cs.cmd.ProcessState || !cs.cmd.ProcessState.Exited() {
if err := cs.cmd.Process.Kill(); nil != err {
return err
}
}
return nil
}
func (s *ContainerService) removeContainerState(containerType ocpcc.ContainerType) {
cs, ok := s.containerStates[containerType]
if !ok || nil == cs {
return
}
delete(s.connectorMap, cs.client.Connector)
cs.client.Stop(context.Background())
s.killContainer(containerType)
delete(s.containerStates, containerType)
}
func cotainerCommand(containerType ocpcc.ContainerType) (cmd *exec.Cmd, pidFilePath string) {
pidFilePath = config.ContainerPIDFilePath(containerType)
loggingConfigFilePath := config.ContainerLoggingConfigFilePath(containerType)
binFilePath := config.ContainerBinFilePath(containerType)
switch containerType {
case ocpcc.ContainerDiscovery, ocpcc.ContainerNetwork:
args := []string{
fmt.Sprintf("-%s=%s", ocpcc.FlagPidFilePathName, pidFilePath),
fmt.Sprintf("-%s=%s", ocpcc.FlagLoggingConfigFilePathName, loggingConfigFilePath),
}
cmd = exec.Command(binFilePath, args...)
case ocpcc.ContainerGenernal:
args := []string{
"-jar",
binFilePath,
pidFilePath,
loggingConfigFilePath,
}
cmd = exec.Command(config.JavaBinPath(), args...)
}
return
}
func removePidFile(pidFilePath string) {
if _, err := os.Stat(pidFilePath); err == nil {
if err := os.Remove(pidFilePath); nil != err {
logging.Logger().Errorf("removing pid file has been failed [%v]", err)
}
}
}
func watchPidFileCreate(pidFilePath string, waitTime time.Duration) (int, error) {
startTime := time.Now()
for {
if _, err := os.Stat(pidFilePath); err == nil {
buf, err := ioutil.ReadFile(pidFilePath)
if nil != err {
return 0, err
}
portNumber, err := strconv.ParseInt(string(buf), 10, 32)
if nil != err {
return 0, err
}
return int(portNumber), nil
}
if time.Since(startTime) > waitTime {
return 0, fmt.Errorf("pid file not exist")
}
time.Sleep(time.Duration(time.Millisecond * 100))
}
}
type containerState struct {
containerType ocpcc.ContainerType
cmd *exec.Cmd
port int
client *crc.Client
}