This commit is contained in:
crusader 2018-03-22 21:12:17 +09:00
parent 11ee798e72
commit ffeccef8d8
5 changed files with 58 additions and 27 deletions

View File

@ -1,7 +1 @@
package overflow_probe_container package overflow_probe_container
import cuc "git.loafle.net/commons_go/util/context"
var (
RPCServletKey = cuc.ContextKey("RPCServlet")
)

View File

@ -52,13 +52,18 @@ func (sh *ServerHandlers) Listen(serverCTX server.ServerContext) (net.Listener,
func (sh *ServerHandlers) OnStart(serverCTX server.ServerContext) { func (sh *ServerHandlers) OnStart(serverCTX server.ServerContext) {
sh.ServerHandlers.OnStart(serverCTX) sh.ServerHandlers.OnStart(serverCTX)
os.Remove(sh.pidPath) if err := os.Remove(sh.pidPath); nil != err {
logging.Logger().Errorf("Container: Removing pid file has been failed [%v]", err)
}
s := strconv.FormatInt(int64(sh.port), 10) s := strconv.FormatInt(int64(sh.port), 10)
ioutil.WriteFile(sh.pidPath, []byte(s), os.ModePerm) ioutil.WriteFile(sh.pidPath, []byte(s), os.ModePerm)
} }
func (sh *ServerHandlers) OnStop(serverCTX server.ServerContext) { func (sh *ServerHandlers) OnStop(serverCTX server.ServerContext) {
os.Remove(sh.pidPath) if err := os.Remove(sh.pidPath); nil != err {
logging.Logger().Errorf("Container: Removing pid file has been failed [%v]", err)
}
sh.ServerHandlers.OnStop(serverCTX) sh.ServerHandlers.OnStop(serverCTX)
} }
@ -67,7 +72,7 @@ func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate() sh.ServerHandlers.Validate()
if "" == sh.pidPath { if "" == sh.pidPath {
logging.Logger().Panicf("Server: The path of pid file must be specified") logging.Logger().Panicf("Container: The path of pid file must be specified")
} }
} }

View File

@ -4,20 +4,21 @@ import (
"net" "net"
"sync" "sync"
cRPC "git.loafle.net/commons_go/rpc" cr "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json" "git.loafle.net/commons_go/rpc/protocol/json"
crsrs "git.loafle.net/commons_go/rpc/server/rwc/socket" crsrs "git.loafle.net/commons_go/rpc/server/rwc/socket"
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
oopc "git.loafle.net/overflow/overflow_probe_container" oopcs "git.loafle.net/overflow/overflow_probe_container/service"
) )
func NewSocketHandler(rpcSH RPCServletHandler) SocketHandler { func NewSocketHandler(rpcSH RPCServletHandler, probeService *oopcs.ProbeService) SocketHandler {
rpcRWCSH := crsrs.New() rpcRWCSH := crsrs.New()
sh := &SocketHandlers{ sh := &SocketHandlers{
rpcSH: rpcSH, rpcSH: rpcSH,
rpcRWCSH: rpcRWCSH, rpcRWCSH: rpcRWCSH,
probeService: probeService,
} }
return sh return sh
@ -26,8 +27,9 @@ func NewSocketHandler(rpcSH RPCServletHandler) SocketHandler {
type SocketHandlers struct { type SocketHandlers struct {
server.SocketHandlers server.SocketHandlers
rpcRWCSH cRPC.ServletReadWriteCloseHandler rpcRWCSH cr.ServletReadWriteCloseHandler
rpcSH RPCServletHandler rpcSH RPCServletHandler
probeService *oopcs.ProbeService
} }
func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error { func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error {
@ -45,15 +47,17 @@ func (sh *SocketHandlers) Handshake(socketCTX server.SocketContext, conn net.Con
func (sh *SocketHandlers) OnConnect(soc server.Socket) { func (sh *SocketHandlers) OnConnect(soc server.Socket) {
sh.SocketHandlers.OnConnect(soc) sh.SocketHandlers.OnConnect(soc)
soc.Context().SetAttribute(cRPC.ContentTypeKey, json.Name) soc.Context().SetAttribute(cr.ContentTypeKey, json.Name)
soc.Context().SetAttribute(oopc.RPCServletKey, retainRPCServlet(sh.rpcSH, sh.rpcRWCSH))
} }
func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) { func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) {
var err error var err error
rpcServlet := soc.Context().GetAttribute(oopc.RPCServletKey).(cRPC.Servlet) rpcServlet := retainRPCServlet(sh.rpcSH, sh.rpcRWCSH)
sh.probeService.RPCServlet = rpcServlet
defer func() { defer func() {
rpcServlet.Stop()
sh.probeService.RPCServlet = nil
releaseRPCServlet(rpcServlet) releaseRPCServlet(rpcServlet)
doneChan <- err doneChan <- err
}() }()
@ -66,15 +70,11 @@ func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, do
select { select {
case err = <-rpcDoneChan: case err = <-rpcDoneChan:
case <-stopChan: case <-stopChan:
rpcServlet.Stop()
<-rpcDoneChan
} }
} }
func (sh *SocketHandlers) OnDisconnect(soc server.Socket) { func (sh *SocketHandlers) OnDisconnect(soc server.Socket) {
soc.Context().RemoveAttribute(oopc.RPCServletKey)
sh.SocketHandlers.OnDisconnect(soc) sh.SocketHandlers.OnDisconnect(soc)
} }
@ -90,15 +90,15 @@ func (sh *SocketHandlers) Validate() {
var rpcServletPool sync.Pool var rpcServletPool sync.Pool
func retainRPCServlet(sh RPCServletHandler, rpcRWCSH cRPC.ServletReadWriteCloseHandler) cRPC.Servlet { func retainRPCServlet(sh RPCServletHandler, rpcRWCSH cr.ServletReadWriteCloseHandler) cr.Servlet {
v := rpcServletPool.Get() v := rpcServletPool.Get()
if v == nil { if v == nil {
return cRPC.NewServlet(sh, rpcRWCSH) return cr.NewServlet(sh, rpcRWCSH)
} }
return v.(cRPC.Servlet) return v.(cr.Servlet)
} }
func releaseRPCServlet(s cRPC.Servlet) { func releaseRPCServlet(s cr.Servlet) {
rpcServletPool.Put(s) rpcServletPool.Put(s)
} }

24
service/ProbeService.go Normal file
View File

@ -0,0 +1,24 @@
package service
import (
"reflect"
cda "git.loafle.net/commons_go/di/annotation"
cdr "git.loafle.net/commons_go/di/registry"
cr "git.loafle.net/commons_go/rpc"
)
func init() {
cdr.RegisterType(reflect.TypeOf((*ProbeService)(nil)))
}
type ProbeService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
RPCServlet cr.Servlet
}
func (ps *ProbeService) Send(method string, params ...interface{}) error {
return ps.RPCServlet.Send(method, params...)
}

8
service/service.go Normal file
View File

@ -0,0 +1,8 @@
package service
func InitService() {
}
func DestroyService() {
}