package service import ( "context" "fmt" "io/ioutil" "os" "os/exec" "reflect" "strconv" "time" cda "git.loafle.net/commons/di-go/annotation" cdr "git.loafle.net/commons/di-go/registry" "git.loafle.net/commons/logging-go" crc "git.loafle.net/commons/rpc-go/client" csc "git.loafle.net/commons/server-go/client" occp "git.loafle.net/overflow/commons-go/config/probe" ocsp "git.loafle.net/overflow/commons-go/service/probe" "git.loafle.net/overflow/probe/client/container" "git.loafle.net/overflow/probe/config" // For annotation _ "git.loafle.net/overflow/commons-go/core/annotation" ) var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil)) func init() { cdr.RegisterType(ContainerServiceType) } type ContainerService struct { ocsp.ContainerService cda.TypeAnnotation `annotation:"@overflow:RPCService()"` DiscoveryService *DiscoveryService `annotation:"@Inject()"` rpcServiceMap map[occp.ContainerType][]interface{} containerStates map[occp.ContainerType]*containerState connectorMap map[csc.Connector]*containerState } func (s *ContainerService) InitService() error { s.containerStates = make(map[occp.ContainerType]*containerState) s.rpcServiceMap = make(map[occp.ContainerType][]interface{}) s.connectorMap = make(map[csc.Connector]*containerState) return nil } func (s *ContainerService) StartService() error { s.rpcServiceMap[occp.ContainerDiscovery] = []interface{}{ s.DiscoveryService, } s.rpcServiceMap[occp.ContainerNetwork] = []interface{}{} s.rpcServiceMap[occp.ContainerGenernal] = []interface{}{} return nil } func (s *ContainerService) StopService() { for containerType := range s.containerStates { s.removeContainerState(containerType) } } func (s *ContainerService) DestroyService() { } func (s *ContainerService) Call(containerType occp.ContainerType, result interface{}, method string, params ...interface{}) error { client, err := s.getClient(containerType) if nil != err { return err } return client.Call(result, method, params...) } func (s *ContainerService) Send(containerType occp.ContainerType, method string, params ...interface{}) error { client, err := s.getClient(containerType) if nil != err { return err } return client.Send(method, params...) } func (s *ContainerService) getClient(containerType occp.ContainerType) (*crc.Client, error) { cs := s.checkContainer(containerType) if nil == cs { _cs, err := s.runContainer(containerType) //_cs, err := s.debugContainer(containerType) if nil != err { return nil, err } cs = _cs s.containerStates[containerType] = cs } if nil == cs.client { client, err := container.NewClient(containerType, cs.port, s.rpcServiceMap[containerType]) if nil != err { s.removeContainerState(containerType) return nil, err } if err := client.Start(); nil != err { s.removeContainerState(containerType) return nil, err } cs.client = client cs.client.Connector.SetOnDisconnected(s.onDisconnected) s.connectorMap[cs.client.Connector] = cs } return cs.client, nil } func (s *ContainerService) onDisconnected(connector csc.Connector) { cs, ok := s.connectorMap[connector] if !ok || nil == cs { return } s.refreshContainer(cs.containerType) } func (s *ContainerService) runContainer(containerType occp.ContainerType) (*containerState, error) { if cs := s.checkContainer(containerType); nil != cs { return cs, nil } cmd, pidFilePath := cotainerCommand(containerType) removePidFile(pidFilePath) if err := cmd.Start(); nil != err { logging.Logger().Errorf("to run Container[%s] failed err %v", containerType.String(), err) return nil, err } port, err := watchPidFileCreate(pidFilePath, time.Duration(time.Second*2)) if nil != err { return nil, err } go func(containerType occp.ContainerType, cmd *exec.Cmd) { if err := cmd.Wait(); nil != err { logging.Logger().Error(err) } logging.Logger().Infof("Container[%s] has been stopped", containerType.String()) s.refreshContainer(containerType) }(containerType, cmd) cs := &containerState{ containerType: containerType, cmd: cmd, port: port, } return cs, nil } func (s *ContainerService) refreshContainer(containerType occp.ContainerType) { cs := s.checkContainer(containerType) if nil == cs { if _, err := s.getClient(containerType); nil != err { logging.Logger().Error(err) } return } delete(s.connectorMap, cs.client.Connector) err := cs.client.Stop(context.Background()) if nil != err { logging.Logger().Error(err) } cs.client = nil if _, err := s.getClient(containerType); nil != err { logging.Logger().Error(err) } // //if cs, ok := s.checkContainer(containerType); nil == cs { // cs, ok := s.containerStates[containerType] // if !ok { // return // } // delete(s.connectorMap, cs.client.Connector) // // logging.Logger().Debugf("Client[%s]11 has been disconnected", cs.containerType.String()) // err := cs.client.Stop(context.Background()) // if nil != err { // logging.Logger().Error(err) // } // // client, err := container.NewClient(cs.containerType, cs.port, s.rpcServiceMap[cs.containerType]) // if nil != err { // s.removeContainerState(cs.containerType) // logging.Logger().Error(err) // return // } // if err := client.Start(); nil != err { // s.removeContainerState(cs.containerType) // logging.Logger().Error(err) // return // } // cs.client = client // cs.client.Connector.SetOnDisconnected(s.onDisconnected) // s.connectorMap[cs.client.Connector] = cs //} else { // cs, ok := s.containerStates[containerType] // if !ok { // return // } // delete(s.connectorMap, cs.client.Connector) // err := cs.client.Stop(context.Background()) // if nil != err { // logging.Logger().Error(err) // } // delete(s.containerStates, containerType) // _, err = s.getClient(containerType) // if nil != err { // logging.Logger().Error(err) // } //} } func (s *ContainerService) debugContainer(containerType occp.ContainerType) (*containerState, error) { cs := &containerState{ containerType: containerType, cmd: nil, port: 60000, } return cs, nil } func (s *ContainerService) checkContainer(containerType occp.ContainerType) *containerState { cs, ok := s.containerStates[containerType] if !ok || nil == cs { return nil } p, err := os.FindProcess(cs.cmd.Process.Pid) if nil != err { s.removeContainerState(containerType) return nil } if nil == p { s.removeContainerState(containerType) return nil } //if nil != cs.cmd.ProcessState && cs.cmd.ProcessState.Exited() { // s.removeContainerState(containerType) // return nil //} return cs } func (s *ContainerService) killContainer(containerType occp.ContainerType) error { cs, ok := s.containerStates[containerType] if !ok || nil == cs { return nil } if nil == cs.cmd.ProcessState || !cs.cmd.ProcessState.Exited() { if err := cs.cmd.Process.Kill(); nil != err { return err } } return nil } func (s *ContainerService) removeContainerState(containerType occp.ContainerType) { cs, ok := s.containerStates[containerType] if !ok || nil == cs { return } delete(s.connectorMap, cs.client.Connector) cs.client.Stop(context.Background()) s.killContainer(containerType) delete(s.containerStates, containerType) } func cotainerCommand(containerType occp.ContainerType) (cmd *exec.Cmd, pidFilePath string) { pidFilePath = config.ContainerPIDFilePath(containerType) loggingConfigFilePath := config.ContainerLoggingConfigFilePath(containerType) binFilePath := config.ContainerBinFilePath(containerType) switch containerType { case occp.ContainerDiscovery, occp.ContainerNetwork: args := []string{ fmt.Sprintf("-%s=%s", occp.FlagPidFilePathName, pidFilePath), fmt.Sprintf("-%s=%s", occp.FlagLoggingConfigFilePathName, loggingConfigFilePath), } cmd = exec.Command(binFilePath, args...) case occp.ContainerGenernal: args := []string{ "-jar", binFilePath, pidFilePath, loggingConfigFilePath, } cmd = exec.Command(config.JavaBinPath(), args...) } return } func removePidFile(pidFilePath string) { if _, err := os.Stat(pidFilePath); err == nil { if err := os.Remove(pidFilePath); nil != err { logging.Logger().Errorf("removing pid file has been failed [%v]", err) } } } func watchPidFileCreate(pidFilePath string, waitTime time.Duration) (int, error) { startTime := time.Now() for { if _, err := os.Stat(pidFilePath); err == nil { buf, err := ioutil.ReadFile(pidFilePath) if nil != err { return 0, err } portNumber, err := strconv.ParseInt(string(buf), 10, 32) if nil != err { return 0, err } return int(portNumber), nil } if time.Since(startTime) > waitTime { return 0, fmt.Errorf("pid file not exist") } time.Sleep(time.Duration(time.Millisecond * 100)) } } type containerState struct { containerType occp.ContainerType cmd *exec.Cmd port int client *crc.Client }