This commit is contained in:
crusader 2018-07-01 13:09:50 +09:00
parent e3d3ae3288
commit 37e3381997
11 changed files with 55 additions and 61 deletions

32
Gopkg.lock generated
View File

@ -30,23 +30,24 @@
packages = [ packages = [
".", ".",
"client", "client",
"codec",
"protocol", "protocol",
"protocol/json", "protocol/json",
"registry" "registry"
] ]
revision = "dcc3af07239b3f6fcbae3529bcb52c522b02053d" revision = "b834f36e2bdb7e5ff93e83ab165ee0310895f6e7"
[[projects]] [[projects]]
branch = "master" branch = "master"
name = "git.loafle.net/commons/server-go" name = "git.loafle.net/commons/server-go"
packages = [ packages = [
".", ".",
"client",
"socket", "socket",
"socket/client",
"socket/web", "socket/web",
"socket/web/client" "socket/web/client"
] ]
revision = "1cae23cf7819d56fd399346a1dc30af6a82d1a93" revision = "20a63b3de6efdc0942bfb4be21ddc64e2b76f59d"
[[projects]] [[projects]]
branch = "master" branch = "master"
@ -122,8 +123,8 @@
"gzip", "gzip",
"zlib" "zlib"
] ]
revision = "6c8db69c4b49dd4df1fff66996cf556176d0b9bf" revision = "5fb1f31b0a61e9858f12f39266e059848a5f1cea"
version = "v1.2.1" version = "v1.3.0"
[[projects]] [[projects]]
name = "github.com/klauspost/cpuid" name = "github.com/klauspost/cpuid"
@ -131,30 +132,15 @@
revision = "ae7887de9fa5d2db4eaa8174a7eff2c1ac00f2da" revision = "ae7887de9fa5d2db4eaa8174a7eff2c1ac00f2da"
version = "v1.1" version = "v1.1"
[[projects]]
name = "github.com/klauspost/crc32"
packages = ["."]
revision = "cb6bfca970f6908083f26f39a79009d608efd5cd"
version = "v1.1"
[[projects]] [[projects]]
branch = "master" branch = "master"
name = "github.com/shirou/gopsutil" name = "github.com/shirou/gopsutil"
packages = [ packages = [
"cpu",
"host", "host",
"internal/common", "internal/common",
"mem",
"net",
"process" "process"
] ]
revision = "b488b2747e2ee0b28e804ec1ea96c82edc5d9bb9" revision = "4a180b209f5f494e5923cfce81ea30ba23915877"
[[projects]]
branch = "master"
name = "github.com/shirou/w32"
packages = ["."]
revision = "bb4de0191aa41b5507caa14b0650cdbddcd9280b"
[[projects]] [[projects]]
name = "github.com/valyala/fasthttp" name = "github.com/valyala/fasthttp"
@ -197,7 +183,7 @@
"unix", "unix",
"windows" "windows"
] ]
revision = "ad87a3a340fa7f3bed189293fbfa7a9b7e021ae1" revision = "7138fd3d9dc8335c567ca206f4333fb75eb05d56"
[[projects]] [[projects]]
name = "gopkg.in/yaml.v2" name = "gopkg.in/yaml.v2"
@ -208,6 +194,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "92fe4d545852654ea83ba10ade4e3a06f2df9fddea268c256c5e706949ff268b" inputs-digest = "17fc7b398d69554c382198312a20efed3c3d2e5a85eb097cba2d3aa8ae857e10"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@ -13,9 +13,11 @@
"writeBufferSize": 4096, "writeBufferSize": 4096,
"pongTimeout": 60, "pongTimeout": 60,
"pingTimeout": 10, "pingTimeout": 10,
"pingPeriod": 9 "pingPeriod": 9,
"compressionThreshold": 8192
} }
}, },
"probe": { "probe": {
"key": "b04f6809754411e8b5f2367a389c8dcd"
} }
} }

View File

@ -8,7 +8,7 @@ import (
cdr "git.loafle.net/commons/di-go/registry" cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client" crc "git.loafle.net/commons/rpc-go/client"
csc "git.loafle.net/commons/server-go/client" cssc "git.loafle.net/commons/server-go/socket/client"
cur "git.loafle.net/commons/util-go/reflect" cur "git.loafle.net/commons/util-go/reflect"
occn "git.loafle.net/overflow/commons-go/config/noauthprobe" occn "git.loafle.net/overflow/commons-go/config/noauthprobe"
"git.loafle.net/overflow/probe/auth/info" "git.loafle.net/overflow/probe/auth/info"
@ -57,7 +57,7 @@ func New(responseHandler func(method string, param string), services ...interfac
} }
} }
connector.OnDisconnected = func(connector csc.Connector) { connector.OnDisconnected = func(connector cssc.Connector) {
logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName()) logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName())
} }

View File

@ -4,7 +4,7 @@ import (
"sync/atomic" "sync/atomic"
crc "git.loafle.net/commons/rpc-go/client" crc "git.loafle.net/commons/rpc-go/client"
csc "git.loafle.net/commons/server-go/client" cssc "git.loafle.net/commons/server-go/socket/client"
) )
type ClientHandler interface { type ClientHandler interface {
@ -17,7 +17,7 @@ type ClientHandlers struct {
validated atomic.Value validated atomic.Value
} }
func (ch *ClientHandlers) Init(clientCtx csc.ClientCtx) error { func (ch *ClientHandlers) Init(clientCtx cssc.ClientCtx) error {
if err := ch.ClientHandlers.Init(clientCtx); nil != err { if err := ch.ClientHandlers.Init(clientCtx); nil != err {
return err return err
} }
@ -25,7 +25,7 @@ func (ch *ClientHandlers) Init(clientCtx csc.ClientCtx) error {
return nil return nil
} }
func (ch *ClientHandlers) OnStart(clientCtx csc.ClientCtx) error { func (ch *ClientHandlers) OnStart(clientCtx cssc.ClientCtx) error {
if err := ch.ClientHandlers.OnStart(clientCtx); nil != err { if err := ch.ClientHandlers.OnStart(clientCtx); nil != err {
return err return err
} }
@ -33,12 +33,12 @@ func (ch *ClientHandlers) OnStart(clientCtx csc.ClientCtx) error {
return nil return nil
} }
func (ch *ClientHandlers) OnStop(clientCtx csc.ClientCtx) { func (ch *ClientHandlers) OnStop(clientCtx cssc.ClientCtx) {
ch.ClientHandlers.OnStop(clientCtx) ch.ClientHandlers.OnStop(clientCtx)
} }
func (ch *ClientHandlers) Destroy(clientCtx csc.ClientCtx) { func (ch *ClientHandlers) Destroy(clientCtx cssc.ClientCtx) {
ch.ClientHandlers.Destroy(clientCtx) ch.ClientHandlers.Destroy(clientCtx)
} }

View File

@ -4,10 +4,10 @@ import (
crc "git.loafle.net/commons/rpc-go/client" crc "git.loafle.net/commons/rpc-go/client"
crpj "git.loafle.net/commons/rpc-go/protocol/json" crpj "git.loafle.net/commons/rpc-go/protocol/json"
crr "git.loafle.net/commons/rpc-go/registry" crr "git.loafle.net/commons/rpc-go/registry"
csc "git.loafle.net/commons/server-go/client" cssc "git.loafle.net/commons/server-go/socket/client"
) )
func New(name string, connector csc.Connector, services []interface{}) *crc.Client { func New(name string, connector cssc.Connector, services []interface{}) *crc.Client {
codec := crpj.NewClientCodec() codec := crpj.NewClientCodec()
var rpcRegistry crr.RPCRegistry var rpcRegistry crr.RPCRegistry

View File

@ -8,13 +8,13 @@ import (
"strings" "strings"
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
csc "git.loafle.net/commons/server-go/client" cssc "git.loafle.net/commons/server-go/socket/client"
occp "git.loafle.net/overflow/commons-go/config/probe" occp "git.loafle.net/overflow/commons-go/config/probe"
"git.loafle.net/overflow/probe/client" "git.loafle.net/overflow/probe/client"
"git.loafle.net/overflow/probe/config" "git.loafle.net/overflow/probe/config"
) )
func New() (csc.Connector, error) { func New() (cssc.Connector, error) {
_config := config.GetConfig() _config := config.GetConfig()
if nil == _config { if nil == _config {
return nil, fmt.Errorf("Config is not available") return nil, fmt.Errorf("Config is not available")
@ -42,7 +42,7 @@ func New() (csc.Connector, error) {
} }
connector.ResponseHandler = func(res *http.Response) { connector.ResponseHandler = func(res *http.Response) {
} }
connector.OnDisconnected = func(connector csc.Connector) { connector.OnDisconnected = func(connector cssc.Connector) {
logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName()) logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName())
} }

View File

@ -6,7 +6,7 @@ import (
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client" crc "git.loafle.net/commons/rpc-go/client"
csc "git.loafle.net/commons/server-go/client" cssc "git.loafle.net/commons/server-go/socket/client"
occp "git.loafle.net/overflow/commons-go/config/probe" occp "git.loafle.net/overflow/commons-go/config/probe"
"git.loafle.net/overflow/probe/client" "git.loafle.net/overflow/probe/client"
"git.loafle.net/overflow/probe/config" "git.loafle.net/overflow/probe/config"
@ -38,7 +38,7 @@ func New(responseHandler func(method string, param string), services ...interfac
} }
} }
connector.OnDisconnected = func(connector csc.Connector) { connector.OnDisconnected = func(connector cssc.Connector) {
logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName()) logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName())
} }

View File

@ -8,20 +8,21 @@ import (
"time" "time"
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
css "git.loafle.net/commons/server-go/socket"
occp "git.loafle.net/overflow/commons-go/config/probe" occp "git.loafle.net/overflow/commons-go/config/probe"
"git.loafle.net/overflow/probe/config" "git.loafle.net/overflow/probe/config"
) )
type OnConnectInfo struct { type OnConnectInfo struct {
ContainerType occp.ContainerType ContainerType occp.ContainerType
WriteChan chan<- []byte WriteChan chan<- css.SocketMessage
} }
func NewContainerSession(containerType occp.ContainerType) *ContainerSession { func NewContainerSession(containerType occp.ContainerType) *ContainerSession {
cs := &ContainerSession{ cs := &ContainerSession{
containerType: containerType, containerType: containerType,
writeChan: make(chan []byte, 256), writeChan: make(chan css.SocketMessage, 256),
} }
return cs return cs
@ -30,8 +31,8 @@ func NewContainerSession(containerType occp.ContainerType) *ContainerSession {
type ContainerSession struct { type ContainerSession struct {
containerType occp.ContainerType containerType occp.ContainerType
cmd *exec.Cmd cmd *exec.Cmd
writeChan chan []byte writeChan chan css.SocketMessage
containerWriteChan chan<- []byte containerWriteChan chan<- css.SocketMessage
writeStopChan chan struct{} writeStopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
@ -59,7 +60,7 @@ func (cs *ContainerSession) Stop() {
cs.stopWg.Wait() cs.stopWg.Wait()
} }
func (cs *ContainerSession) Connected(writeChan chan<- []byte) { func (cs *ContainerSession) Connected(writeChan chan<- css.SocketMessage) {
logging.Logger().Debugf("Container[%s] has been connected", cs.containerType.String()) logging.Logger().Debugf("Container[%s] has been connected", cs.containerType.String())
cs.containerWriteChan = writeChan cs.containerWriteChan = writeChan
} }
@ -69,10 +70,10 @@ func (cs *ContainerSession) Disconnected() {
cs.refreshContainer() cs.refreshContainer()
} }
func (cs *ContainerSession) Send(buff []byte) error { func (cs *ContainerSession) Send(messageType int, message []byte) error {
select { select {
case cs.writeChan <- buff: case cs.writeChan <- css.MakeSocketMessage(messageType, message):
logging.Logger().Debugf("send buffer of Container[%s] has been queued %s", cs.containerType.String(), string(buff)) logging.Logger().Debugf("send buffer of Container[%s] has been queued %s", cs.containerType.String(), string(message))
return nil return nil
default: default:
return fmt.Errorf("Buffer of Container[%s] is full", cs.containerType.String()) return fmt.Errorf("Buffer of Container[%s] is full", cs.containerType.String())
@ -84,22 +85,26 @@ func (cs *ContainerSession) handleWrite() {
cs.stopWg.Done() cs.stopWg.Done()
}() }()
var ( var (
message []byte socketMessage css.SocketMessage
ok bool messageType int
message []byte
ok bool
) )
for { for {
select { select {
case message, ok = <-cs.writeChan: case socketMessage, ok = <-cs.writeChan:
if !ok { if !ok {
logging.Logger().Debugf("WriteChan of Container[%s] has been closed", cs.containerType.String()) logging.Logger().Debugf("WriteChan of Container[%s] has been closed", cs.containerType.String())
return return
} }
messageType, message = socketMessage()
LOOP_WRITE: LOOP_WRITE:
for { for {
select { select {
case cs.containerWriteChan <- message: case cs.containerWriteChan <- css.MakeSocketMessage(messageType, message):
logging.Logger().Debugf("message of Container[%s] has been sended %s", cs.containerType.String(), string(message)) logging.Logger().Debugf("message of Container[%s] has been sended %s", cs.containerType.String(), string(message))
break LOOP_WRITE break LOOP_WRITE
case <-cs.writeStopChan: case <-cs.writeStopChan:

View File

@ -7,7 +7,7 @@ import (
crp "git.loafle.net/commons/rpc-go/protocol" crp "git.loafle.net/commons/rpc-go/protocol"
crr "git.loafle.net/commons/rpc-go/registry" crr "git.loafle.net/commons/rpc-go/registry"
"git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go"
"git.loafle.net/commons/server-go/socket" css "git.loafle.net/commons/server-go/socket"
cssw "git.loafle.net/commons/server-go/socket/web" cssw "git.loafle.net/commons/server-go/socket/web"
cuc "git.loafle.net/commons/util-go/context" cuc "git.loafle.net/commons/util-go/context"
occc "git.loafle.net/overflow/commons-go/config/container" occc "git.loafle.net/overflow/commons-go/config/container"
@ -84,7 +84,7 @@ func (s *ContainerServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthtt
return nil, nil return nil, nil
} }
func (s *ContainerServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { func (s *ContainerServlets) OnConnect(servletCtx server.ServletCtx, conn css.Conn) {
s.Servlets.OnConnect(servletCtx, conn) s.Servlets.OnConnect(servletCtx, conn)
} }
@ -96,7 +96,7 @@ func (s *ContainerServlets) OnDisconnect(servletCtx server.ServletCtx) {
func (s *ContainerServlets) Handle(servletCtx server.ServletCtx, func (s *ContainerServlets) Handle(servletCtx server.ServletCtx,
stopChan <-chan struct{}, doneChan chan<- struct{}, stopChan <-chan struct{}, doneChan chan<- struct{},
readChan <-chan []byte, writeChan chan<- []byte) { readChan <-chan css.SocketMessage, writeChan chan<- css.SocketMessage) {
_containerType := servletCtx.GetAttribute(ContainerTypeKey) _containerType := servletCtx.GetAttribute(ContainerTypeKey)
@ -126,7 +126,7 @@ func (s *ContainerServlets) Handle(servletCtx server.ServletCtx,
for { for {
select { select {
case msg, ok := <-readChan: case socketMessage, ok := <-readChan:
if !ok { if !ok {
return return
} }

View File

@ -110,12 +110,12 @@ func (s *ContainerService) Send(containerType occp.ContainerType, method string,
} }
containerSession := _containerSession.(*container.ContainerSession) containerSession := _containerSession.(*container.ContainerSession)
buff, err := s.RPCServerCodec.NewNotification(method, params) messageType, message, err := s.RPCServerCodec.NewNotification(method, params)
if nil != err { if nil != err {
return err return err
} }
return containerSession.Send(buff) return containerSession.Send(messageType, message)
} }
func (s *ContainerService) RunContainer(containerType occp.ContainerType) error { func (s *ContainerService) RunContainer(containerType occp.ContainerType) error {

View File

@ -8,7 +8,8 @@ import (
cda "git.loafle.net/commons/di-go/annotation" cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry" cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
csc "git.loafle.net/commons/server-go/client" css "git.loafle.net/commons/server-go/socket"
cssc "git.loafle.net/commons/server-go/socket/client"
ocmd "git.loafle.net/overflow/commons-go/model/data" ocmd "git.loafle.net/overflow/commons-go/model/data"
ocsp "git.loafle.net/overflow/commons-go/service/probe" ocsp "git.loafle.net/overflow/commons-go/service/probe"
"git.loafle.net/overflow/probe/client/data" "git.loafle.net/overflow/probe/client/data"
@ -28,9 +29,9 @@ type MetricService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"` cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
connector csc.Connector connector cssc.Connector
readChan <-chan []byte readChan <-chan css.SocketMessage
writeChan chan<- []byte writeChan chan<- css.SocketMessage
} }
func (s *MetricService) InitService() error { func (s *MetricService) InitService() error {
@ -77,7 +78,7 @@ func (s *MetricService) Send(metric *ocmd.Metric) error {
} }
// s.MetricService.Send("MS", sensorConfigID, metric) // s.MetricService.Send("MS", sensorConfigID, metric)
// logging.Logger().Debugf("Metric: %v", metric) // logging.Logger().Debugf("Metric: %v", metric)
s.writeChan <- buff s.writeChan <- css.MakeSocketMessage(css.TextMessage, buff)
return nil return nil
} }