overflow_discovery/server/server_handlers.go

105 lines
1.9 KiB
Go
Raw Normal View History

2017-10-26 12:55:55 +00:00
package server
import (
2017-11-15 06:59:43 +00:00
"fmt"
2017-11-01 06:35:03 +00:00
"net"
2017-11-29 00:36:19 +00:00
"sync"
2017-10-26 12:55:55 +00:00
2017-11-15 06:59:43 +00:00
"git.loafle.net/commons_go/logging"
2017-11-15 12:09:38 +00:00
"git.loafle.net/overflow/overflow_discovery/discovery"
2017-11-15 06:59:43 +00:00
2017-11-29 00:36:19 +00:00
cRPC "git.loafle.net/commons_go/rpc"
2017-11-15 06:59:43 +00:00
"git.loafle.net/commons_go/server"
2017-10-26 12:55:55 +00:00
)
2017-11-29 00:36:19 +00:00
func newServerHandler(addr string, rpcSH RPCServletHandler) ServerHandler {
2017-11-15 06:59:43 +00:00
sh := &ServerHandlers{
addr: addr,
rpcSH: rpcSH,
}
sh.Name = "Discovery"
2017-11-01 06:35:03 +00:00
2017-10-26 12:55:55 +00:00
return sh
}
type ServerHandlers struct {
2017-11-01 06:35:03 +00:00
server.ServerHandlers
2017-10-26 12:55:55 +00:00
2017-11-29 00:36:19 +00:00
rpcSH RPCServletHandler
2017-11-15 06:59:43 +00:00
addr string
}
func (sh *ServerHandlers) Init() error {
return nil
2017-10-26 12:55:55 +00:00
}
2017-11-01 06:35:03 +00:00
func (sh *ServerHandlers) OnStart() {
2017-11-15 12:09:38 +00:00
discovery.DiscoveryInit()
2017-11-01 06:35:03 +00:00
}
2017-10-26 13:18:20 +00:00
2017-11-15 06:59:43 +00:00
func (sh *ServerHandlers) OnConnect(conn net.Conn) (net.Conn, error) {
var err error
if conn, err = sh.ServerHandlers.OnConnect(conn); nil != err {
return nil, err
}
2017-11-23 09:34:07 +00:00
nConn := newConn(conn, "jsonrpc")
return nConn, nil
2017-11-15 06:59:43 +00:00
}
2017-11-29 00:36:19 +00:00
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- error) {
var err error
rpcServlet := retainRPCServlet(sh.rpcSH)
2017-11-23 09:34:07 +00:00
defer func() {
2017-11-29 00:36:19 +00:00
releaseRPCServlet(rpcServlet)
doneChan <- err
2017-11-23 09:34:07 +00:00
}()
2017-11-29 00:36:19 +00:00
rpcDoneChan := make(chan error, 1)
if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err {
2017-11-15 06:59:43 +00:00
return
}
2017-11-29 00:36:19 +00:00
select {
case err = <-rpcDoneChan:
case <-stopChan:
rpcServlet.Stop()
2017-11-15 06:59:43 +00:00
}
2017-11-29 00:36:19 +00:00
2017-11-15 06:59:43 +00:00
}
2017-11-01 06:35:03 +00:00
func (sh *ServerHandlers) OnStop() {
2017-11-23 09:34:07 +00:00
2017-11-15 12:09:38 +00:00
discovery.DiscoveryDestroy()
2017-11-01 06:35:03 +00:00
}
2017-10-26 13:18:20 +00:00
2017-11-15 06:59:43 +00:00
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
if "" == sh.addr {
logging.Logger().Panic(fmt.Sprintf("Server: Address of server must be specified"))
2017-10-26 13:18:20 +00:00
}
2017-11-01 06:35:03 +00:00
2017-11-15 06:59:43 +00:00
if nil == sh.rpcSH {
logging.Logger().Panic(fmt.Sprintf("Server: RPC Server Handler must be specified"))
}
2017-11-01 06:35:03 +00:00
}
2017-11-29 00:36:19 +00:00
var rpcServletPool sync.Pool
func retainRPCServlet(sh RPCServletHandler) cRPC.Servlet {
v := rpcServletPool.Get()
if v == nil {
return cRPC.NewServlet(sh)
}
return v.(cRPC.Servlet)
}
func releaseRPCServlet(s cRPC.Servlet) {
rpcServletPool.Put(s)
}