diff --git a/discovery/discovery.go b/discovery/discovery.go index 977a42c..1d2bd8d 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -7,7 +7,6 @@ import ( "git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/util/net/cidr" "git.loafle.net/overflow/overflow_discovery/api/module/discovery/model" - "git.loafle.net/overflow/overflow_discovery/rpc/notify" ) var discoverer *discovery @@ -141,7 +140,7 @@ func (d *discovery) discoverService(port *model.Port, ds *model.DiscoveryService } func (d *discovery) sendResult(method string, args ...interface{}) { - go notify.Notifier.Notify(method, args...) + // go notify.Notifier.Notify(method, args...) } func (d *discovery) sendError() { diff --git a/rpc/notify/notify.go b/rpc/notify/notify.go deleted file mode 100644 index a9eb83e..0000000 --- a/rpc/notify/notify.go +++ /dev/null @@ -1,27 +0,0 @@ -package notify - -import ( - "net" - - "git.loafle.net/commons_go/rpc/notify" -) - -func NotifyInit(conn net.Conn) { - Notifier = New() - Notifier.Start(conn) -} - -func NotifyDestroy() { - Notifier.Close() -} - -var Notifier notify.Notifier - -func New() notify.Notifier { - - nh := NewNotifyHandler() - - n := notify.New(nh) - - return n -} diff --git a/rpc/notify/notify_handler.go b/rpc/notify/notify_handler.go deleted file mode 100644 index f1dcd2e..0000000 --- a/rpc/notify/notify_handler.go +++ /dev/null @@ -1,7 +0,0 @@ -package notify - -import "git.loafle.net/commons_go/rpc/notify" - -type NotifyHandler interface { - notify.NotifyHandler -} diff --git a/rpc/notify/notify_handlers.go b/rpc/notify/notify_handlers.go deleted file mode 100644 index 31ac232..0000000 --- a/rpc/notify/notify_handlers.go +++ /dev/null @@ -1,17 +0,0 @@ -package notify - -import ( - "git.loafle.net/commons_go/rpc/notify" - "git.loafle.net/commons_go/rpc/protocol/json" -) - -func NewNotifyHandler() NotifyHandler { - nh := &NotifyHandlers{} - nh.Codec = json.NewClientCodec() - - return nh -} - -type NotifyHandlers struct { - notify.NotifyHandlers -} diff --git a/server/rpc_server_handlers.go b/server/rpc_server_handlers.go deleted file mode 100644 index dfc0373..0000000 --- a/server/rpc_server_handlers.go +++ /dev/null @@ -1,61 +0,0 @@ -package server - -import ( - "io" - - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/server" -) - -func newRPCServerHandler(rpcRegistry rpc.Registry) server.ServerHandler { - sh := &RPCServerHandlers{} - sh.RPCRegistry = rpcRegistry - - return sh -} - -type RPCServerHandlers struct { - server.ServerHandlers -} - -func (sh *RPCServerHandlers) Init() error { - - return nil -} - -func (sh *RPCServerHandlers) OnStart() { - // no op -} - -func (sh *RPCServerHandlers) OnPreRead(r io.Reader) { - // no op -} - -func (sh *RPCServerHandlers) OnPostRead(r io.Reader) { - // no op -} - -func (sh *RPCServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (sh *RPCServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (sh *RPCServerHandlers) OnPreWriteError(w io.Writer, err error) { - // no op -} - -func (sh *RPCServerHandlers) OnPostWriteError(w io.Writer, err error) { - // no op -} - -func (sh *RPCServerHandlers) OnStop() { - // no op -} - -func (sh *RPCServerHandlers) Validate() { - sh.ServerHandlers.Validate() - -} diff --git a/server/rpc_server_handler.go b/server/rpc_servlet_handler.go similarity index 54% rename from server/rpc_server_handler.go rename to server/rpc_servlet_handler.go index 01e9d6c..5b4944a 100644 --- a/server/rpc_server_handler.go +++ b/server/rpc_servlet_handler.go @@ -4,6 +4,6 @@ import ( "git.loafle.net/commons_go/rpc/server" ) -type RPCServerHandler interface { - server.ServerHandler +type RPCServletHandler interface { + server.ServletHandler } diff --git a/server/rpc_servlet_handlers.go b/server/rpc_servlet_handlers.go new file mode 100644 index 0000000..728bf24 --- /dev/null +++ b/server/rpc_servlet_handlers.go @@ -0,0 +1,37 @@ +package server + +import ( + "git.loafle.net/commons_go/rpc" + crcs "git.loafle.net/commons_go/rpc/connection/socket" + "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/rpc/server" +) + +func newRPCServletHandler(rpcRegistry rpc.Registry) server.ServletHandler { + sh := &RPCServletHandlers{} + sh.RPCRegistry = rpcRegistry + + return sh +} + +type RPCServletHandlers struct { + server.ServletHandlers + rpcIO crcs.ServletHandlers +} + +func (sh *RPCServletHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { + return sh.rpcIO.ReadRequest(servletCTX, codec, conn) +} + +func (sh *RPCServletHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { + return sh.rpcIO.WriteResponse(servletCTX, conn, requestCodec, result, err) +} + +func (sh *RPCServletHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { + return sh.rpcIO.WriteNotification(servletCTX, conn, codec, method, args) +} + +func (sh *RPCServletHandlers) Validate() { + sh.ServletHandlers.Validate() + +} diff --git a/server/server_handlers.go b/server/server_handlers.go index ed72379..739241f 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -2,18 +2,17 @@ package server import ( "fmt" - "log" "net" + "sync" "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_discovery/discovery" - "git.loafle.net/overflow/overflow_discovery/rpc/notify" - crs "git.loafle.net/commons_go/rpc/server" + cRPC "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/server" ) -func newServerHandler(addr string, rpcSH RPCServerHandler) ServerHandler { +func newServerHandler(addr string, rpcSH RPCServletHandler) ServerHandler { sh := &ServerHandlers{ addr: addr, rpcSH: rpcSH, @@ -26,7 +25,7 @@ func newServerHandler(addr string, rpcSH RPCServerHandler) ServerHandler { type ServerHandlers struct { server.ServerHandlers - rpcSH RPCServerHandler + rpcSH RPCServletHandler addr string } @@ -46,40 +45,30 @@ func (sh *ServerHandlers) OnConnect(conn net.Conn) (net.Conn, error) { } nConn := newConn(conn, "jsonrpc") - notify.NotifyInit(nConn) - return nConn, nil } -func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { +func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- error) { + var err error + rpcServlet := retainRPCServlet(sh.rpcSH) + defer func() { - notify.NotifyDestroy() + releaseRPCServlet(rpcServlet) + doneChan <- err }() - dConn := conn.(Conn) - contentType := dConn.GetContentType() - codec, err := sh.rpcSH.GetCodec(contentType) - if nil != err { - log.Printf("RPC Handle: %v", err) - doneChan <- struct{}{} + rpcDoneChan := make(chan error, 1) + + if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err { return } - for { - if err := crs.Handle(sh.rpcSH, codec, conn, conn); nil != err { - if server.IsClientDisconnect(err) { - doneChan <- struct{}{} - return - } - log.Printf("RPC: %v", err) - } - - select { - case <-stopChan: - return - default: - } + select { + case err = <-rpcDoneChan: + case <-stopChan: + rpcServlet.Stop() } + } func (sh *ServerHandlers) OnStop() { @@ -98,3 +87,18 @@ func (sh *ServerHandlers) Validate() { logging.Logger().Panic(fmt.Sprintf("Server: RPC Server Handler must be specified")) } } + +var rpcServletPool sync.Pool + +func retainRPCServlet(sh RPCServletHandler) cRPC.Servlet { + v := rpcServletPool.Get() + if v == nil { + return cRPC.NewServlet(sh) + } + return v.(cRPC.Servlet) +} + +func releaseRPCServlet(s cRPC.Servlet) { + + rpcServletPool.Put(s) +}