This commit is contained in:
crusader 2017-11-29 14:19:36 +09:00
parent 195a4708cb
commit 2b154aeb87
6 changed files with 88 additions and 68 deletions

View File

@ -12,11 +12,12 @@ func New(addr string) server.Server {
rpcRegistry := cr.NewRegistry()
rpc.RegisterRPC(rpcRegistry)
rpcSH := newRPCServerHandler(rpcRegistry)
rpcSH.RegisterCodec(crpj.NewServerCodec(), crpj.Name)
rpcSH := newRPCServletHandler(rpcRegistry)
rpcSH.RegisterCodec(crpj.Name, crpj.NewServerCodec())
sh := newServerHandler(addr, rpcSH)
socketHandler := newSocketHandler(rpcSH)
sh := newServerHandler(addr, socketHandler)
s := server.New(sh)
return s

View File

@ -2,78 +2,48 @@ package server
import (
"fmt"
"net"
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_discovery/discovery"
cRPC "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/server"
)
func newServerHandler(addr string, rpcSH RPCServletHandler) ServerHandler {
func newServerHandler(addr string, socketHandler SocketHandler) ServerHandler {
sh := &ServerHandlers{
addr: addr,
rpcSH: rpcSH,
}
sh.Name = "Discovery"
sh.Name = "Discovery"
sh.SocketHandler = socketHandler
return sh
}
type ServerHandlers struct {
server.ServerHandlers
rpcSH RPCServletHandler
addr string
}
func (sh *ServerHandlers) Init() error {
func (sh *ServerHandlers) Init(serverCTX server.ServerContext) error {
if err := sh.ServerHandlers.Init(serverCTX); nil != err {
return err
}
return nil
}
func (sh *ServerHandlers) OnStart() {
func (sh *ServerHandlers) OnStart(serverCTX server.ServerContext) {
sh.ServerHandlers.OnStart(serverCTX)
discovery.DiscoveryInit()
}
func (sh *ServerHandlers) OnConnect(conn net.Conn) (net.Conn, error) {
var err error
if conn, err = sh.ServerHandlers.OnConnect(conn); nil != err {
return nil, err
}
nConn := newConn(conn, "jsonrpc")
return nConn, nil
}
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- error) {
var err error
rpcServlet := retainRPCServlet(sh.rpcSH)
defer func() {
releaseRPCServlet(rpcServlet)
doneChan <- err
}()
rpcDoneChan := make(chan error, 1)
if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err {
return
}
select {
case err = <-rpcDoneChan:
case <-stopChan:
rpcServlet.Stop()
}
}
func (sh *ServerHandlers) OnStop() {
func (sh *ServerHandlers) OnStop(serverCTX server.ServerContext) {
discovery.DiscoveryDestroy()
sh.ServerHandlers.OnStop(serverCTX)
}
func (sh *ServerHandlers) Validate() {
@ -83,22 +53,4 @@ func (sh *ServerHandlers) Validate() {
logging.Logger().Panic(fmt.Sprintf("Server: Address of server must be specified"))
}
if nil == sh.rpcSH {
logging.Logger().Panic(fmt.Sprintf("Server: RPC Server Handler must be specified"))
}
}
var rpcServletPool sync.Pool
func retainRPCServlet(sh RPCServletHandler) cRPC.Servlet {
v := rpcServletPool.Get()
if v == nil {
return cRPC.NewServlet(sh)
}
return v.(cRPC.Servlet)
}
func releaseRPCServlet(s cRPC.Servlet) {
rpcServletPool.Put(s)
}

View File

@ -3,9 +3,11 @@ package server
import (
"net"
"os"
"git.loafle.net/commons_go/server"
)
func (sh *ServerHandlers) Listen() (net.Listener, error) {
func (sh *ServerHandlers) Listen(serverCTX server.ServerContext) (net.Listener, error) {
os.Remove(sh.addr)
l, err := net.ListenUnix("unix", &net.UnixAddr{Name: sh.addr, Net: "unix"})
if nil == err {

View File

@ -3,10 +3,11 @@ package server
import (
"net"
"git.loafle.net/commons_go/server"
"gopkg.in/natefinch/npipe.v2"
)
func (sh *ServerHandlers) Listen() (net.Listener, error) {
func (sh *ServerHandlers) Listen(serverCTX server.ServerContext) (net.Listener, error) {
ln, err := npipe.Listen(`\\.\pipe\` + sh.addr)
if err != nil {
// handle error

9
server/socket_handler.go Normal file
View File

@ -0,0 +1,9 @@
package server
import (
"git.loafle.net/commons_go/server"
)
type SocketHandler interface {
server.SocketHandler
}

55
server/socket_handlers.go Normal file
View File

@ -0,0 +1,55 @@
package server
import (
"net"
"git.loafle.net/commons_go/server"
)
func newSocketHandler(rpcSH RPCServletHandler) SocketHandler {
sh := &SocketHandlers{
rpcSH: rpcSH,
}
return sh
}
type SocketHandlers struct {
server.SocketHandlers
rpcSH RPCServletHandler
}
func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error {
if err := sh.SocketHandlers.Init(serverCTX); nil != err {
return err
}
return nil
}
func (sh *SocketHandlers) Handshake(serverCTX server.ServerContext, conn net.Conn) (id string) {
return ""
}
func (sh *SocketHandlers) OnConnect(soc server.Socket) {
// no op
}
func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) {
// no op
}
func (sh *SocketHandlers) OnDisconnect(soc server.Socket) {
sh.SocketHandlers.OnDisconnect(soc)
}
func (sh *SocketHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *SocketHandlers) Validate() {
}