probe/container/container-session.go
crusader 37e3381997 int
2018-07-01 13:09:50 +09:00

189 lines
4.5 KiB
Go

package container
import (
"fmt"
"os/exec"
"strconv"
"sync"
"time"
logging "git.loafle.net/commons/logging-go"
css "git.loafle.net/commons/server-go/socket"
occp "git.loafle.net/overflow/commons-go/config/probe"
"git.loafle.net/overflow/probe/config"
)
type OnConnectInfo struct {
ContainerType occp.ContainerType
WriteChan chan<- css.SocketMessage
}
func NewContainerSession(containerType occp.ContainerType) *ContainerSession {
cs := &ContainerSession{
containerType: containerType,
writeChan: make(chan css.SocketMessage, 256),
}
return cs
}
type ContainerSession struct {
containerType occp.ContainerType
cmd *exec.Cmd
writeChan chan css.SocketMessage
containerWriteChan chan<- css.SocketMessage
writeStopChan chan struct{}
stopWg sync.WaitGroup
}
func (cs *ContainerSession) Start() error {
if err := cs.runContainer(); nil != err {
return err
}
cs.writeStopChan = make(chan struct{})
cs.stopWg.Add(1)
go cs.handleWrite()
return nil
}
func (cs *ContainerSession) Stop() {
close(cs.writeStopChan)
if err := cs.killContainer(); nil != err {
logging.Logger().Error(err)
}
cs.stopWg.Wait()
}
func (cs *ContainerSession) Connected(writeChan chan<- css.SocketMessage) {
logging.Logger().Debugf("Container[%s] has been connected", cs.containerType.String())
cs.containerWriteChan = writeChan
}
func (cs *ContainerSession) Disconnected() {
logging.Logger().Debugf("Container[%s] has been disconnected", cs.containerType.String())
cs.refreshContainer()
}
func (cs *ContainerSession) Send(messageType int, message []byte) error {
select {
case cs.writeChan <- css.MakeSocketMessage(messageType, message):
logging.Logger().Debugf("send buffer of Container[%s] has been queued %s", cs.containerType.String(), string(message))
return nil
default:
return fmt.Errorf("Buffer of Container[%s] is full", cs.containerType.String())
}
}
func (cs *ContainerSession) handleWrite() {
defer func() {
cs.stopWg.Done()
}()
var (
socketMessage css.SocketMessage
messageType int
message []byte
ok bool
)
for {
select {
case socketMessage, ok = <-cs.writeChan:
if !ok {
logging.Logger().Debugf("WriteChan of Container[%s] has been closed", cs.containerType.String())
return
}
messageType, message = socketMessage()
LOOP_WRITE:
for {
select {
case cs.containerWriteChan <- css.MakeSocketMessage(messageType, message):
logging.Logger().Debugf("message of Container[%s] has been sended %s", cs.containerType.String(), string(message))
break LOOP_WRITE
case <-cs.writeStopChan:
return
default:
time.Sleep(100 * time.Millisecond)
continue LOOP_WRITE
}
}
case <-cs.writeStopChan:
return
}
}
}
func (cs *ContainerSession) runContainer() error {
cmd := cotainerCommand(cs.containerType)
if err := cmd.Start(); nil != err {
logging.Logger().Errorf("to run Container[%s] failed err %v", cs.containerType.String(), err)
return err
}
cs.cmd = cmd
logging.Logger().Debugf("Process of Container[%s] has been started", cs.containerType.String())
return nil
}
func (cs *ContainerSession) killContainer() error {
defer func() {
cs.cmd = nil
logging.Logger().Debugf("Process of Container[%s] has been stopped", cs.containerType.String())
}()
if nil == cs.cmd {
return nil
}
// if nil == cs.cmd.ProcessState || !cs.cmd.ProcessState.Exited() {
if err := cs.cmd.Process.Kill(); nil != err {
return err
}
// }
return nil
}
func (cs *ContainerSession) refreshContainer() {
if err := cs.killContainer(); nil != err {
logging.Logger().Error(err)
}
if err := cs.runContainer(); nil != err {
logging.Logger().Error(err)
}
logging.Logger().Debugf("Process of Container[%s] has been refreshed", cs.containerType.String())
}
func cotainerCommand(containerType occp.ContainerType) (cmd *exec.Cmd) {
loggingConfigFilePath := config.ContainerLoggingConfigFilePath(containerType)
binFilePath := config.ContainerBinFilePath(containerType)
switch containerType {
case occp.ContainerDiscovery, occp.ContainerNetwork:
args := []string{
fmt.Sprintf("-%s=%s", occp.FlagProbePortName, strconv.FormatInt(config.ProbePortNumber, 10)),
fmt.Sprintf("-%s=%s", occp.FlagLoggingConfigFilePathName, loggingConfigFilePath),
}
cmd = exec.Command(binFilePath, args...)
case occp.ContainerGeneral:
args := []string{
"-jar",
binFilePath,
strconv.FormatInt(config.ProbePortNumber, 10),
fmt.Sprintf("-Dlogging.config=file:\"%s\"", loggingConfigFilePath),
}
cmd = exec.Command(config.JavaBinPath(), args...)
}
return
}