diff --git a/server/server.go b/server/server.go index 95fc4de..f362c90 100644 --- a/server/server.go +++ b/server/server.go @@ -12,11 +12,12 @@ func New(addr string) server.Server { rpcRegistry := cr.NewRegistry() rpc.RegisterRPC(rpcRegistry) - rpcSH := newRPCServerHandler(rpcRegistry) - rpcSH.RegisterCodec(crpj.NewServerCodec(), crpj.Name) + rpcSH := newRPCServletHandler(rpcRegistry) + rpcSH.RegisterCodec(crpj.Name, crpj.NewServerCodec()) - sh := newServerHandler(addr, rpcSH) + socketHandler := newSocketHandler(rpcSH) + sh := newServerHandler(addr, socketHandler) s := server.New(sh) return s diff --git a/server/server_handlers.go b/server/server_handlers.go index 739241f..59314a3 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -2,78 +2,48 @@ package server import ( "fmt" - "net" - "sync" "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_discovery/discovery" - cRPC "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/server" ) -func newServerHandler(addr string, rpcSH RPCServletHandler) ServerHandler { +func newServerHandler(addr string, socketHandler SocketHandler) ServerHandler { sh := &ServerHandlers{ - addr: addr, - rpcSH: rpcSH, + addr: addr, } - sh.Name = "Discovery" + sh.Name = "Discovery" + sh.SocketHandler = socketHandler return sh } type ServerHandlers struct { server.ServerHandlers - rpcSH RPCServletHandler - addr string + addr string } -func (sh *ServerHandlers) Init() error { +func (sh *ServerHandlers) Init(serverCTX server.ServerContext) error { + if err := sh.ServerHandlers.Init(serverCTX); nil != err { + return err + } return nil } -func (sh *ServerHandlers) OnStart() { +func (sh *ServerHandlers) OnStart(serverCTX server.ServerContext) { + sh.ServerHandlers.OnStart(serverCTX) + discovery.DiscoveryInit() } -func (sh *ServerHandlers) OnConnect(conn net.Conn) (net.Conn, error) { - var err error - if conn, err = sh.ServerHandlers.OnConnect(conn); nil != err { - return nil, err - } - nConn := newConn(conn, "jsonrpc") - - return nConn, nil -} - -func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- error) { - var err error - rpcServlet := retainRPCServlet(sh.rpcSH) - - defer func() { - releaseRPCServlet(rpcServlet) - doneChan <- err - }() - - rpcDoneChan := make(chan error, 1) - - if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err { - return - } - - select { - case err = <-rpcDoneChan: - case <-stopChan: - rpcServlet.Stop() - } - -} - -func (sh *ServerHandlers) OnStop() { +func (sh *ServerHandlers) OnStop(serverCTX server.ServerContext) { discovery.DiscoveryDestroy() + + sh.ServerHandlers.OnStop(serverCTX) } func (sh *ServerHandlers) Validate() { @@ -83,22 +53,4 @@ func (sh *ServerHandlers) Validate() { logging.Logger().Panic(fmt.Sprintf("Server: Address of server must be specified")) } - if nil == sh.rpcSH { - logging.Logger().Panic(fmt.Sprintf("Server: RPC Server Handler must be specified")) - } -} - -var rpcServletPool sync.Pool - -func retainRPCServlet(sh RPCServletHandler) cRPC.Servlet { - v := rpcServletPool.Get() - if v == nil { - return cRPC.NewServlet(sh) - } - return v.(cRPC.Servlet) -} - -func releaseRPCServlet(s cRPC.Servlet) { - - rpcServletPool.Put(s) } diff --git a/server/server_handlers_unix.go b/server/server_handlers_unix.go index 6bc7801..4aad065 100644 --- a/server/server_handlers_unix.go +++ b/server/server_handlers_unix.go @@ -3,9 +3,11 @@ package server import ( "net" "os" + + "git.loafle.net/commons_go/server" ) -func (sh *ServerHandlers) Listen() (net.Listener, error) { +func (sh *ServerHandlers) Listen(serverCTX server.ServerContext) (net.Listener, error) { os.Remove(sh.addr) l, err := net.ListenUnix("unix", &net.UnixAddr{Name: sh.addr, Net: "unix"}) if nil == err { diff --git a/server/server_handlers_windows.go b/server/server_handlers_windows.go index 4e04941..4c51a3e 100644 --- a/server/server_handlers_windows.go +++ b/server/server_handlers_windows.go @@ -3,10 +3,11 @@ package server import ( "net" + "git.loafle.net/commons_go/server" "gopkg.in/natefinch/npipe.v2" ) -func (sh *ServerHandlers) Listen() (net.Listener, error) { +func (sh *ServerHandlers) Listen(serverCTX server.ServerContext) (net.Listener, error) { ln, err := npipe.Listen(`\\.\pipe\` + sh.addr) if err != nil { // handle error diff --git a/server/socket_handler.go b/server/socket_handler.go new file mode 100644 index 0000000..ed3b15e --- /dev/null +++ b/server/socket_handler.go @@ -0,0 +1,9 @@ +package server + +import ( + "git.loafle.net/commons_go/server" +) + +type SocketHandler interface { + server.SocketHandler +} diff --git a/server/socket_handlers.go b/server/socket_handlers.go new file mode 100644 index 0000000..8377cac --- /dev/null +++ b/server/socket_handlers.go @@ -0,0 +1,55 @@ +package server + +import ( + "net" + + "git.loafle.net/commons_go/server" +) + +func newSocketHandler(rpcSH RPCServletHandler) SocketHandler { + sh := &SocketHandlers{ + rpcSH: rpcSH, + } + + return sh +} + +type SocketHandlers struct { + server.SocketHandlers + + rpcSH RPCServletHandler +} + +func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error { + if err := sh.SocketHandlers.Init(serverCTX); nil != err { + return err + } + + return nil +} + +func (sh *SocketHandlers) Handshake(serverCTX server.ServerContext, conn net.Conn) (id string) { + return "" +} + +func (sh *SocketHandlers) OnConnect(soc server.Socket) { + // no op +} + +func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) { + // no op +} + +func (sh *SocketHandlers) OnDisconnect(soc server.Socket) { + + sh.SocketHandlers.OnDisconnect(soc) +} + +func (sh *SocketHandlers) Destroy() { + + sh.SocketHandlers.Destroy() +} + +func (sh *SocketHandlers) Validate() { + +}