probe/container/container-session.go
crusader defcb8c433 ing
2018-05-04 20:45:23 +09:00

184 lines
4.2 KiB
Go

package container
import (
"fmt"
"os/exec"
"strconv"
"sync"
"time"
logging "git.loafle.net/commons/logging-go"
occp "git.loafle.net/overflow/commons-go/config/probe"
"git.loafle.net/overflow/probe/config"
)
type OnConnectInfo struct {
ContainerType occp.ContainerType
WriteChan chan<- []byte
}
func NewContainerSession(containerType occp.ContainerType) *ContainerSession {
cs := &ContainerSession{
containerType: containerType,
writeChan: make(chan []byte, 256),
}
return cs
}
type ContainerSession struct {
containerType occp.ContainerType
cmd *exec.Cmd
writeChan chan []byte
containerWriteChan chan<- []byte
writeStopChan chan struct{}
stopWg sync.WaitGroup
}
func (cs *ContainerSession) Start() error {
if err := cs.runContainer(); nil != err {
return err
}
cs.writeStopChan = make(chan struct{})
cs.stopWg.Add(1)
go cs.handleWrite()
return nil
}
func (cs *ContainerSession) Stop() {
close(cs.writeStopChan)
if err := cs.killContainer(); nil != err {
logging.Logger().Error(err)
}
cs.stopWg.Wait()
}
func (cs *ContainerSession) Connected(writeChan chan<- []byte) {
logging.Logger().Debugf("Container[%s] has been connected", cs.containerType.String())
cs.containerWriteChan = writeChan
}
func (cs *ContainerSession) Disconnected() {
logging.Logger().Debugf("Container[%s] has been disconnected", cs.containerType.String())
cs.refreshContainer()
}
func (cs *ContainerSession) Send(buff []byte) error {
select {
case cs.writeChan <- buff:
logging.Logger().Debugf("send buffer of Container[%s] has been queued %s", cs.containerType.String(), string(buff))
return nil
default:
return fmt.Errorf("Buffer of Container[%s] is full", cs.containerType.String())
}
}
func (cs *ContainerSession) handleWrite() {
defer func() {
cs.stopWg.Done()
}()
var (
message []byte
ok bool
)
for {
select {
case message, ok = <-cs.writeChan:
if !ok {
logging.Logger().Debugf("WriteChan of Container[%s] has been closed", cs.containerType.String())
return
}
LOOP_WRITE:
for {
select {
case cs.containerWriteChan <- message:
logging.Logger().Debugf("message of Container[%s] has been sended %s", cs.containerType.String(), string(message))
break LOOP_WRITE
case <-cs.writeStopChan:
return
default:
time.Sleep(100 * time.Millisecond)
continue LOOP_WRITE
}
}
case <-cs.writeStopChan:
return
}
}
}
func (cs *ContainerSession) runContainer() error {
cmd := cotainerCommand(cs.containerType)
if err := cmd.Start(); nil != err {
logging.Logger().Errorf("to run Container[%s] failed err %v", cs.containerType.String(), err)
return err
}
cs.cmd = cmd
logging.Logger().Debugf("Process of Container[%s] has been started", cs.containerType.String())
return nil
}
func (cs *ContainerSession) killContainer() error {
defer func() {
cs.cmd = nil
logging.Logger().Debugf("Process of Container[%s] has been stopped", cs.containerType.String())
}()
if nil == cs.cmd {
return nil
}
if nil == cs.cmd.ProcessState || !cs.cmd.ProcessState.Exited() {
if err := cs.cmd.Process.Kill(); nil != err {
return err
}
}
return nil
}
func (cs *ContainerSession) refreshContainer() {
if err := cs.killContainer(); nil != err {
logging.Logger().Error(err)
}
if err := cs.runContainer(); nil != err {
logging.Logger().Error(err)
}
logging.Logger().Debugf("Process of Container[%s] has been refreshed", cs.containerType.String())
}
func cotainerCommand(containerType occp.ContainerType) (cmd *exec.Cmd) {
loggingConfigFilePath := config.ContainerLoggingConfigFilePath(containerType)
binFilePath := config.ContainerBinFilePath(containerType)
switch containerType {
case occp.ContainerDiscovery, occp.ContainerNetwork:
args := []string{
fmt.Sprintf("-%s=%s", occp.FlagProbePortName, strconv.FormatInt(config.ProbePortNumber, 10)),
fmt.Sprintf("-%s=%s", occp.FlagLoggingConfigFilePathName, loggingConfigFilePath),
}
cmd = exec.Command(binFilePath, args...)
case occp.ContainerGeneral:
args := []string{
"-jar",
binFilePath,
strconv.FormatInt(config.ProbePortNumber, 10),
fmt.Sprintf("-Dlogging.config=file:\"%s\"", loggingConfigFilePath),
}
cmd = exec.Command(config.JavaBinPath(), args...)
}
return
}