overflow_probe_container/server/socket_handlers.go

105 lines
2.1 KiB
Go
Raw Normal View History

2017-12-05 05:20:42 +00:00
package server
import (
"net"
"sync"
2018-03-22 12:12:17 +00:00
cr "git.loafle.net/commons_go/rpc"
2017-12-05 05:20:42 +00:00
"git.loafle.net/commons_go/rpc/protocol/json"
2017-12-05 05:34:07 +00:00
crsrs "git.loafle.net/commons_go/rpc/server/rwc/socket"
2017-12-05 05:20:42 +00:00
"git.loafle.net/commons_go/server"
2017-12-05 05:34:07 +00:00
2018-03-22 12:12:17 +00:00
oopcs "git.loafle.net/overflow/overflow_probe_container/service"
2017-12-05 05:20:42 +00:00
)
2018-03-22 12:12:17 +00:00
func NewSocketHandler(rpcSH RPCServletHandler, probeService *oopcs.ProbeService) SocketHandler {
2017-12-05 05:20:42 +00:00
rpcRWCSH := crsrs.New()
sh := &SocketHandlers{
2018-03-22 12:12:17 +00:00
rpcSH: rpcSH,
rpcRWCSH: rpcRWCSH,
probeService: probeService,
2017-12-05 05:20:42 +00:00
}
return sh
}
type SocketHandlers struct {
server.SocketHandlers
2018-03-22 12:12:17 +00:00
rpcRWCSH cr.ServletReadWriteCloseHandler
rpcSH RPCServletHandler
probeService *oopcs.ProbeService
2017-12-05 05:20:42 +00:00
}
func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error {
if err := sh.SocketHandlers.Init(serverCTX); nil != err {
return err
}
return nil
}
func (sh *SocketHandlers) Handshake(socketCTX server.SocketContext, conn net.Conn) (id string) {
2017-12-05 05:40:31 +00:00
return "server"
2017-12-05 05:20:42 +00:00
}
func (sh *SocketHandlers) OnConnect(soc server.Socket) {
sh.SocketHandlers.OnConnect(soc)
2018-03-22 12:12:17 +00:00
soc.Context().SetAttribute(cr.ContentTypeKey, json.Name)
2017-12-05 05:20:42 +00:00
}
func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) {
var err error
2018-03-22 12:12:17 +00:00
rpcServlet := retainRPCServlet(sh.rpcSH, sh.rpcRWCSH)
sh.probeService.RPCServlet = rpcServlet
2017-12-05 05:20:42 +00:00
defer func() {
2018-03-22 12:12:17 +00:00
rpcServlet.Stop()
sh.probeService.RPCServlet = nil
2017-12-05 05:20:42 +00:00
releaseRPCServlet(rpcServlet)
doneChan <- err
}()
rpcDoneChan := make(chan error, 1)
if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err {
return
}
select {
case err = <-rpcDoneChan:
case <-stopChan:
}
}
func (sh *SocketHandlers) OnDisconnect(soc server.Socket) {
sh.SocketHandlers.OnDisconnect(soc)
}
func (sh *SocketHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *SocketHandlers) Validate() {
}
var rpcServletPool sync.Pool
2018-03-22 12:12:17 +00:00
func retainRPCServlet(sh RPCServletHandler, rpcRWCSH cr.ServletReadWriteCloseHandler) cr.Servlet {
2017-12-05 05:20:42 +00:00
v := rpcServletPool.Get()
if v == nil {
2018-03-22 12:12:17 +00:00
return cr.NewServlet(sh, rpcRWCSH)
2017-12-05 05:20:42 +00:00
}
2018-03-22 12:12:17 +00:00
return v.(cr.Servlet)
2017-12-05 05:20:42 +00:00
}
2018-03-22 12:12:17 +00:00
func releaseRPCServlet(s cr.Servlet) {
2017-12-05 05:20:42 +00:00
rpcServletPool.Put(s)
}