package container import ( "fmt" "os/exec" "strconv" "sync" "time" logging "git.loafle.net/commons/logging-go" occp "git.loafle.net/overflow/commons-go/config/probe" "git.loafle.net/overflow/probe/config" ) type OnConnectInfo struct { ContainerType occp.ContainerType WriteChan chan<- []byte } func NewContainerSession(containerType occp.ContainerType) *ContainerSession { cs := &ContainerSession{ containerType: containerType, writeChan: make(chan []byte, 256), } return cs } type ContainerSession struct { containerType occp.ContainerType cmd *exec.Cmd writeChan chan []byte containerWriteChan chan<- []byte writeStopChan chan struct{} stopWg sync.WaitGroup } func (cs *ContainerSession) Start() error { if err := cs.runContainer(); nil != err { return err } cs.writeStopChan = make(chan struct{}) cs.stopWg.Add(1) go cs.handleWrite() return nil } func (cs *ContainerSession) Stop() { close(cs.writeStopChan) if err := cs.killContainer(); nil != err { logging.Logger().Error(err) } cs.stopWg.Wait() } func (cs *ContainerSession) Connected(writeChan chan<- []byte) { logging.Logger().Debugf("Container[%s] has been connected", cs.containerType.String()) cs.containerWriteChan = writeChan } func (cs *ContainerSession) Disconnected() { logging.Logger().Debugf("Container[%s] has been disconnected", cs.containerType.String()) cs.refreshContainer() } func (cs *ContainerSession) Send(buff []byte) error { select { case cs.writeChan <- buff: logging.Logger().Debugf("send buffer of Container[%s] has been queued %s", cs.containerType.String(), string(buff)) return nil default: return fmt.Errorf("Buffer of Container[%s] is full", cs.containerType.String()) } } func (cs *ContainerSession) handleWrite() { defer func() { cs.stopWg.Done() }() var ( message []byte ok bool ) for { select { case message, ok = <-cs.writeChan: if !ok { logging.Logger().Debugf("WriteChan of Container[%s] has been closed", cs.containerType.String()) return } LOOP_WRITE: for { select { case cs.containerWriteChan <- message: logging.Logger().Debugf("message of Container[%s] has been sended %s", cs.containerType.String(), string(message)) break LOOP_WRITE case <-cs.writeStopChan: return default: time.Sleep(100 * time.Millisecond) continue LOOP_WRITE } } case <-cs.writeStopChan: return } } } func (cs *ContainerSession) runContainer() error { cmd := cotainerCommand(cs.containerType) if err := cmd.Start(); nil != err { logging.Logger().Errorf("to run Container[%s] failed err %v", cs.containerType.String(), err) return err } cs.cmd = cmd logging.Logger().Debugf("Process of Container[%s] has been started", cs.containerType.String()) return nil } func (cs *ContainerSession) killContainer() error { defer func() { cs.cmd = nil logging.Logger().Debugf("Process of Container[%s] has been stopped", cs.containerType.String()) }() if nil == cs.cmd { return nil } if nil == cs.cmd.ProcessState || !cs.cmd.ProcessState.Exited() { if err := cs.cmd.Process.Kill(); nil != err { return err } } return nil } func (cs *ContainerSession) refreshContainer() { if err := cs.killContainer(); nil != err { logging.Logger().Error(err) } if err := cs.runContainer(); nil != err { logging.Logger().Error(err) } logging.Logger().Debugf("Process of Container[%s] has been refreshed", cs.containerType.String()) } func cotainerCommand(containerType occp.ContainerType) (cmd *exec.Cmd) { loggingConfigFilePath := config.ContainerLoggingConfigFilePath(containerType) binFilePath := config.ContainerBinFilePath(containerType) switch containerType { case occp.ContainerDiscovery, occp.ContainerNetwork: args := []string{ fmt.Sprintf("-%s=%s", occp.FlagProbePortName, strconv.FormatInt(config.ProbePortNumber, 10)), fmt.Sprintf("-%s=%s", occp.FlagLoggingConfigFilePathName, loggingConfigFilePath), } cmd = exec.Command(binFilePath, args...) case occp.ContainerGenernal: args := []string{ "-jar", binFilePath, strconv.FormatInt(config.ProbePortNumber, 10), fmt.Sprintf("-Dlogging.config=%s", loggingConfigFilePath), } cmd = exec.Command(config.JavaBinPath(), args...) } return }