overflow_discovery/server/server_handlers.go
crusader bb465d3628 ing
2017-11-15 15:59:43 +09:00

90 lines
1.6 KiB
Go

package server
import (
"fmt"
"log"
"net"
"git.loafle.net/commons_go/logging"
crs "git.loafle.net/commons_go/rpc/server"
"git.loafle.net/commons_go/server"
)
func newServerHandler(addr string, rpcSH RPCServerHandler) ServerHandler {
sh := &ServerHandlers{
addr: addr,
rpcSH: rpcSH,
}
sh.Name = "Discovery"
return sh
}
type ServerHandlers struct {
server.ServerHandlers
rpcSH RPCServerHandler
addr string
}
func (sh *ServerHandlers) Init() error {
return nil
}
func (sh *ServerHandlers) OnStart() {
// no op
}
func (sh *ServerHandlers) OnConnect(conn net.Conn) (net.Conn, error) {
var err error
if conn, err = sh.ServerHandlers.OnConnect(conn); nil != err {
return nil, err
}
return newConn(conn, "jsonrpc"), nil
}
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
dConn := conn.(Conn)
contentType := dConn.GetContentType()
codec, err := sh.rpcSH.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
for {
if err := crs.Handle(sh.rpcSH, codec, conn, conn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
select {
case <-stopChan:
return
default:
}
}
}
func (sh *ServerHandlers) OnStop() {
// no op
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
if "" == sh.addr {
logging.Logger().Panic(fmt.Sprintf("Server: Address of server must be specified"))
}
if nil == sh.rpcSH {
logging.Logger().Panic(fmt.Sprintf("Server: RPC Server Handler must be specified"))
}
}