189 lines
4.5 KiB
Go
189 lines
4.5 KiB
Go
package container
|
|
|
|
import (
|
|
"fmt"
|
|
"os/exec"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
logging "git.loafle.net/commons/logging-go"
|
|
css "git.loafle.net/commons/server-go/socket"
|
|
occp "git.loafle.net/overflow/commons-go/config/probe"
|
|
"git.loafle.net/overflow/probe/config"
|
|
)
|
|
|
|
type OnConnectInfo struct {
|
|
ContainerType occp.ContainerType
|
|
WriteChan chan<- css.SocketMessage
|
|
}
|
|
|
|
func NewContainerSession(containerType occp.ContainerType) *ContainerSession {
|
|
|
|
cs := &ContainerSession{
|
|
containerType: containerType,
|
|
writeChan: make(chan css.SocketMessage, 256),
|
|
}
|
|
|
|
return cs
|
|
}
|
|
|
|
type ContainerSession struct {
|
|
containerType occp.ContainerType
|
|
cmd *exec.Cmd
|
|
writeChan chan css.SocketMessage
|
|
containerWriteChan chan<- css.SocketMessage
|
|
|
|
writeStopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
}
|
|
|
|
func (cs *ContainerSession) Start() error {
|
|
if err := cs.runContainer(); nil != err {
|
|
return err
|
|
}
|
|
|
|
cs.writeStopChan = make(chan struct{})
|
|
cs.stopWg.Add(1)
|
|
go cs.handleWrite()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cs *ContainerSession) Stop() {
|
|
close(cs.writeStopChan)
|
|
|
|
if err := cs.killContainer(); nil != err {
|
|
logging.Logger().Error(err)
|
|
}
|
|
|
|
cs.stopWg.Wait()
|
|
}
|
|
|
|
func (cs *ContainerSession) Connected(writeChan chan<- css.SocketMessage) {
|
|
logging.Logger().Debugf("Container[%s] has been connected", cs.containerType.String())
|
|
cs.containerWriteChan = writeChan
|
|
}
|
|
|
|
func (cs *ContainerSession) Disconnected() {
|
|
logging.Logger().Debugf("Container[%s] has been disconnected", cs.containerType.String())
|
|
cs.refreshContainer()
|
|
}
|
|
|
|
func (cs *ContainerSession) Send(messageType int, message []byte) error {
|
|
select {
|
|
case cs.writeChan <- css.MakeSocketMessage(messageType, message):
|
|
logging.Logger().Debugf("send buffer of Container[%s] has been queued %s", cs.containerType.String(), string(message))
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("Buffer of Container[%s] is full", cs.containerType.String())
|
|
}
|
|
}
|
|
|
|
func (cs *ContainerSession) handleWrite() {
|
|
defer func() {
|
|
cs.stopWg.Done()
|
|
}()
|
|
var (
|
|
socketMessage css.SocketMessage
|
|
messageType int
|
|
message []byte
|
|
ok bool
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case socketMessage, ok = <-cs.writeChan:
|
|
if !ok {
|
|
logging.Logger().Debugf("WriteChan of Container[%s] has been closed", cs.containerType.String())
|
|
return
|
|
}
|
|
|
|
messageType, message = socketMessage()
|
|
|
|
LOOP_WRITE:
|
|
for {
|
|
select {
|
|
case cs.containerWriteChan <- css.MakeSocketMessage(messageType, message):
|
|
logging.Logger().Debugf("message of Container[%s] has been sended %s", cs.containerType.String(), string(message))
|
|
break LOOP_WRITE
|
|
case <-cs.writeStopChan:
|
|
return
|
|
default:
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue LOOP_WRITE
|
|
}
|
|
}
|
|
case <-cs.writeStopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cs *ContainerSession) runContainer() error {
|
|
cmd := cotainerCommand(cs.containerType)
|
|
|
|
if err := cmd.Start(); nil != err {
|
|
logging.Logger().Errorf("to run Container[%s] failed err %v", cs.containerType.String(), err)
|
|
return err
|
|
}
|
|
|
|
cs.cmd = cmd
|
|
logging.Logger().Debugf("Process of Container[%s] has been started", cs.containerType.String())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cs *ContainerSession) killContainer() error {
|
|
defer func() {
|
|
cs.cmd = nil
|
|
logging.Logger().Debugf("Process of Container[%s] has been stopped", cs.containerType.String())
|
|
}()
|
|
|
|
if nil == cs.cmd {
|
|
return nil
|
|
}
|
|
|
|
// if nil == cs.cmd.ProcessState || !cs.cmd.ProcessState.Exited() {
|
|
if err := cs.cmd.Process.Kill(); nil != err {
|
|
return err
|
|
}
|
|
// }
|
|
return nil
|
|
}
|
|
|
|
func (cs *ContainerSession) refreshContainer() {
|
|
if err := cs.killContainer(); nil != err {
|
|
logging.Logger().Error(err)
|
|
}
|
|
if err := cs.runContainer(); nil != err {
|
|
logging.Logger().Error(err)
|
|
}
|
|
|
|
logging.Logger().Debugf("Process of Container[%s] has been refreshed", cs.containerType.String())
|
|
}
|
|
|
|
func cotainerCommand(containerType occp.ContainerType) (cmd *exec.Cmd) {
|
|
loggingConfigFilePath := config.ContainerLoggingConfigFilePath(containerType)
|
|
binFilePath := config.ContainerBinFilePath(containerType)
|
|
|
|
switch containerType {
|
|
case occp.ContainerDiscovery, occp.ContainerNetwork:
|
|
args := []string{
|
|
fmt.Sprintf("-%s=%s", occp.FlagProbePortName, strconv.FormatInt(config.ProbePortNumber, 10)),
|
|
fmt.Sprintf("-%s=%s", occp.FlagLoggingConfigFilePathName, loggingConfigFilePath),
|
|
}
|
|
cmd = exec.Command(binFilePath, args...)
|
|
case occp.ContainerGeneral:
|
|
args := []string{
|
|
"-jar",
|
|
binFilePath,
|
|
strconv.FormatInt(config.ProbePortNumber, 10),
|
|
fmt.Sprintf("-Dlogging.config=file:\"%s\"", loggingConfigFilePath),
|
|
}
|
|
|
|
cmd = exec.Command(config.JavaBinPath(), args...)
|
|
}
|
|
return
|
|
}
|