From fda60823d65e01f8576373db74fa7f22f7cd28a9 Mon Sep 17 00:00:00 2001 From: crusader Date: Wed, 1 Nov 2017 15:17:32 +0900 Subject: [PATCH] ing --- adapter/ipc/adapter.go | 13 ---- adapter/ipc/ipc.go | 14 ----- adapter/ipc/server_handlers.go | 32 ---------- server/server.go | 106 +++++++++++++-------------------- server/server_handler.go | 7 +-- server/server_handlers.go | 8 ++- 6 files changed, 51 insertions(+), 129 deletions(-) delete mode 100644 adapter/ipc/adapter.go delete mode 100644 adapter/ipc/ipc.go delete mode 100644 adapter/ipc/server_handlers.go diff --git a/adapter/ipc/adapter.go b/adapter/ipc/adapter.go deleted file mode 100644 index 89b0605..0000000 --- a/adapter/ipc/adapter.go +++ /dev/null @@ -1,13 +0,0 @@ -package ipc - -import "git.loafle.net/commons_go/rpc" - -type IPCAdapter struct { - registry rpc.Registry -} - -func NewAdapter(registry rpc.Registry) *IPCAdapter { - return &IPCAdapter{ - registry: registry, - } -} diff --git a/adapter/ipc/ipc.go b/adapter/ipc/ipc.go deleted file mode 100644 index 8b39110..0000000 --- a/adapter/ipc/ipc.go +++ /dev/null @@ -1,14 +0,0 @@ -package ipc - -import ( - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/server" -) - -func NewServer(addr string, registry rpc.Registry) server.Server { - sh := NewServerHandler(addr, registry) - - s := server.NewServer(sh) - - return s -} diff --git a/adapter/ipc/server_handlers.go b/adapter/ipc/server_handlers.go deleted file mode 100644 index 7ed2da6..0000000 --- a/adapter/ipc/server_handlers.go +++ /dev/null @@ -1,32 +0,0 @@ -package ipc - -import ( - "io" - - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/server" - "git.loafle.net/commons_go/server/ipc" -) - -func NewServerHandler(addr string, registry rpc.Registry) ServerHandler { - sh := &ServerHandlers{ - registry: registry, - } - sh.Addr = addr - - return sh -} - -type ServerHandler interface { - server.ServerHandler -} - -type ServerHandlers struct { - ipc.ServerHandlers - - registry rpc.Registry -} - -func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) { - -} diff --git a/server/server.go b/server/server.go index 5615d8d..0db7018 100644 --- a/server/server.go +++ b/server/server.go @@ -1,89 +1,67 @@ package server import ( - "io" - "sync" + "log" + "net" "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/server" ) func New(sh ServerHandler) Server { - s := &server{ - sh: sh, - } + lsh := &serverHandlers{} + lsh.ServerHandler = sh + lsh.lsh = sh + s := server.NewServer(lsh) + return s } -type Server interface { - Start() - Stop() - Handle(r io.Reader, w io.Writer) error +type serverHandlers struct { + server.ServerHandler + lsh ServerHandler } -type server struct { - sh ServerHandler - - stopChan chan struct{} - stopWg sync.WaitGroup -} - -func (s *server) Start() { - if nil == s.sh { - panic("Server: server handler must be specified.") - } - s.sh.Validate() - - if s.stopChan != nil { - panic("Server: server is already running. Stop it before starting it again") - } - s.stopChan = make(chan struct{}) - -} - -func (s *server) Stop() { - if s.stopChan == nil { - panic("Server: server must be started before stopping it") - } - close(s.stopChan) - s.stopWg.Wait() - s.stopChan = nil -} - -func (s *server) Handle(r io.Reader, w io.Writer) error { - contentType := s.sh.GetContentType(r) - codec, err := s.sh.getCodec(contentType) +func (sh *serverHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { + contentType := sh.lsh.GetContentType(conn) + codec, err := sh.lsh.getCodec(contentType) if nil != err { - return err + log.Printf("RPC Handle: %v", err) + doneChan <- struct{}{} + return } var codecReq protocol.ServerCodecRequest - defer func() { - if nil != codecReq { - codecReq.Complete() +Loop: + for { + sh.lsh.OnPreRead(conn) + // Create a new codec request. + codecReq, errNew := codec.NewRequest(conn) + if nil != errNew { + log.Printf("RPC Handle: %v", errNew) + doneChan <- struct{}{} + return } - }() + sh.lsh.OnPostRead(conn) - s.sh.OnPreRead(r) - // Create a new codec request. - codecReq, errNew := codec.NewRequest(r) - if nil != errNew { - return errNew - } - s.sh.OnPostRead(r) + result, err := sh.lsh.invoke(codecReq) - result, err := s.sh.invoke(codecReq) + if nil != err { + sh.lsh.OnPreWriteError(conn, err) + codecReq.WriteError(conn, 400, err) + sh.lsh.OnPostWriteError(conn, err) + } else { + sh.lsh.OnPreWriteResult(conn, result) + codecReq.WriteResponse(conn, result) + sh.lsh.OnPostWriteResult(conn, result) + } - if nil != err { - s.sh.OnPreWriteError(w, err) - codecReq.WriteError(w, 400, err) - s.sh.OnPostWriteError(w, err) - return nil + select { + case <-stopChan: + return + default: + } } - s.sh.OnPreWriteResult(w, result) - codecReq.WriteResponse(w, result) - s.sh.OnPostWriteResult(w, result) - - return nil } diff --git a/server/server_handler.go b/server/server_handler.go index 53f848c..f63ad06 100644 --- a/server/server_handler.go +++ b/server/server_handler.go @@ -4,12 +4,13 @@ import ( "io" "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/server" ) type ServerHandler interface { - RegisterCodec(codec protocol.ServerCodec, contentType string) - + server.ServerHandler GetContentType(r io.Reader) string + RegisterCodec(codec protocol.ServerCodec, contentType string) OnPreRead(r io.Reader) OnPostRead(r io.Reader) @@ -22,6 +23,4 @@ type ServerHandler interface { getCodec(contentType string) (protocol.ServerCodec, error) invoke(codec protocol.RegistryCodec) (result interface{}, err error) - - Validate() } diff --git a/server/server_handlers.go b/server/server_handlers.go index a1df42f..87ab4b6 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -7,12 +7,14 @@ import ( "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/server" ) type ServerHandlers struct { - RPCRegistry rpc.Registry + server.ServerHandlers - codecs map[string]protocol.ServerCodec + RPCRegistry rpc.Registry + codecs map[string]protocol.ServerCodec } // RegisterCodec adds a new codec to the server. @@ -56,6 +58,8 @@ func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) { } func (sh *ServerHandlers) Validate() { + sh.ServerHandlers.Validate() + if nil == sh.RPCRegistry { panic("RPCRegistry must be specified.") }