diff --git a/server/rpc_server_handler.go b/server/rpc_server_handler.go new file mode 100644 index 0000000..0e963b8 --- /dev/null +++ b/server/rpc_server_handler.go @@ -0,0 +1,24 @@ +package server + +import ( + "io" + + "git.loafle.net/commons_go/rpc/protocol" +) + +type RPCServerHandler interface { + GetContentType(r io.Reader) string + RegisterCodec(codec protocol.ServerCodec, contentType string) + + OnPreRead(r io.Reader) + OnPostRead(r io.Reader) + + OnPreWriteResult(w io.Writer, result interface{}) + OnPostWriteResult(w io.Writer, result interface{}) + + OnPreWriteError(w io.Writer, err error) + OnPostWriteError(w io.Writer, err error) + + getCodec(contentType string) (protocol.ServerCodec, error) + invoke(codec protocol.RegistryCodec) (result interface{}, err error) +} diff --git a/server/rpc_server_handlers.go b/server/rpc_server_handlers.go new file mode 100644 index 0000000..9981cbb --- /dev/null +++ b/server/rpc_server_handlers.go @@ -0,0 +1,80 @@ +package server + +import ( + "fmt" + "io" + "strings" + + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol" +) + +type RPCServerHandlers struct { + RPCRegistry rpc.Registry + codecs map[string]protocol.ServerCodec +} + +// RegisterCodec adds a new codec to the server. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (rpcSH *RPCServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) { + if nil == rpcSH.codecs { + rpcSH.codecs = make(map[string]protocol.ServerCodec) + } + rpcSH.codecs[strings.ToLower(contentType)] = codec +} + +func (rpcSH *RPCServerHandlers) GetContentType(r io.Reader) string { + return "" +} + +func (rpcSH *RPCServerHandlers) OnPreRead(r io.Reader) { + // no op +} + +func (rpcSH *RPCServerHandlers) OnPostRead(r io.Reader) { + // no op +} + +func (rpcSH *RPCServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (rpcSH *RPCServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (rpcSH *RPCServerHandlers) OnPreWriteError(w io.Writer, err error) { + // no op +} + +func (rpcSH *RPCServerHandlers) OnPostWriteError(w io.Writer, err error) { + // no op +} + +func (rpcSH *RPCServerHandlers) Validate() { + if nil == rpcSH.RPCRegistry { + panic("RPCRegistry must be specified.") + } +} + +func (rpcSH *RPCServerHandlers) getCodec(contentType string) (protocol.ServerCodec, error) { + var codec protocol.ServerCodec + if contentType == "" && len(rpcSH.codecs) == 1 { + // If Content-Type is not set and only one codec has been registered, + // then default to that codec. + for _, c := range rpcSH.codecs { + codec = c + } + } else if codec = rpcSH.codecs[strings.ToLower(contentType)]; codec == nil { + return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) + } + + return codec, nil +} + +func (rpcSH *RPCServerHandlers) invoke(codec protocol.RegistryCodec) (result interface{}, err error) { + return rpcSH.RPCRegistry.Invoke(codec) +} diff --git a/server/server.go b/server/server.go index 75869c9..e9ff5fb 100644 --- a/server/server.go +++ b/server/server.go @@ -1,67 +1,11 @@ package server import ( - "log" - "net" - "git.loafle.net/commons_go/server" ) func New(sh ServerHandler) server.Server { - lsh := &serverHandlers{} - lsh.ServerHandler = sh - lsh.lsh = sh - s := server.New(lsh) + s := server.New(sh) return s } - -type serverHandlers struct { - server.ServerHandler - lsh ServerHandler -} - -func (sh *serverHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { - contentType := sh.lsh.GetContentType(conn) - codec, err := sh.lsh.getCodec(contentType) - if nil != err { - log.Printf("RPC Handle: %v", err) - doneChan <- struct{}{} - return - } - - for { - sh.lsh.OnPreRead(conn) - // Create a new codec request. - codecReq, errNew := codec.NewRequest(conn) - if nil != errNew { - if sh.IsClientDisconnect(errNew) { - doneChan <- struct{}{} - return - } - log.Printf("RPC Handle: %v", errNew) - doneChan <- struct{}{} - return - } - sh.lsh.OnPostRead(conn) - - result, err := sh.lsh.invoke(codecReq) - - if nil != err { - sh.lsh.OnPreWriteError(conn, err) - codecReq.WriteError(conn, 400, err) - sh.lsh.OnPostWriteError(conn, err) - } else { - sh.lsh.OnPreWriteResult(conn, result) - codecReq.WriteResponse(conn, result) - sh.lsh.OnPostWriteResult(conn, result) - } - - select { - case <-stopChan: - return - default: - } - } - -} diff --git a/server/server_handler.go b/server/server_handler.go index f63ad06..cf02713 100644 --- a/server/server_handler.go +++ b/server/server_handler.go @@ -1,26 +1,7 @@ package server -import ( - "io" - - "git.loafle.net/commons_go/rpc/protocol" - "git.loafle.net/commons_go/server" -) +import "git.loafle.net/commons_go/server" type ServerHandler interface { server.ServerHandler - GetContentType(r io.Reader) string - RegisterCodec(codec protocol.ServerCodec, contentType string) - - OnPreRead(r io.Reader) - OnPostRead(r io.Reader) - - OnPreWriteResult(w io.Writer, result interface{}) - OnPostWriteResult(w io.Writer, result interface{}) - - OnPreWriteError(w io.Writer, err error) - OnPostWriteError(w io.Writer, err error) - - getCodec(contentType string) (protocol.ServerCodec, error) - invoke(codec protocol.RegistryCodec) (result interface{}, err error) } diff --git a/server/server_handlers.go b/server/server_handlers.go index 87ab4b6..47d5c45 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -1,85 +1,67 @@ package server import ( - "fmt" - "io" - "strings" + "log" + "net" - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/server" ) type ServerHandlers struct { server.ServerHandlers - RPCRegistry rpc.Registry - codecs map[string]protocol.ServerCodec + RPCServerHandler RPCServerHandler } -// RegisterCodec adds a new codec to the server. -// -// Codecs are defined to process a given serialization scheme, e.g., JSON or -// XML. A codec is chosen based on the "Content-Type" header from the request, -// excluding the charset definition. -func (sh *ServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) { - if nil == sh.codecs { - sh.codecs = make(map[string]protocol.ServerCodec) +func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { + contentType := sh.RPCServerHandler.GetContentType(conn) + codec, err := sh.RPCServerHandler.getCodec(contentType) + if nil != err { + log.Printf("RPC Handle: %v", err) + doneChan <- struct{}{} + return } - sh.codecs[strings.ToLower(contentType)] = codec -} -func (sh *ServerHandlers) GetContentType(r io.Reader) string { - return "" -} + for { + sh.RPCServerHandler.OnPreRead(conn) + // Create a new codec request. + codecReq, errNew := codec.NewRequest(conn) + if nil != errNew { + if sh.IsClientDisconnect(errNew) { + doneChan <- struct{}{} + return + } + log.Printf("RPC Handle: %v", errNew) + doneChan <- struct{}{} + return + } + sh.RPCServerHandler.OnPostRead(conn) -func (sh *ServerHandlers) OnPreRead(r io.Reader) { - // no op -} + result, err := sh.RPCServerHandler.invoke(codecReq) -func (sh *ServerHandlers) OnPostRead(r io.Reader) { - // no op -} + if nil != err { + sh.RPCServerHandler.OnPreWriteError(conn, err) + codecReq.WriteError(conn, 400, err) + sh.RPCServerHandler.OnPostWriteError(conn, err) + } else { + sh.RPCServerHandler.OnPreWriteResult(conn, result) + codecReq.WriteResponse(conn, result) + sh.RPCServerHandler.OnPostWriteResult(conn, result) + } -func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { - // no op -} + select { + case <-stopChan: + return + default: + } + } -func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) { - // no op -} - -func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) { - // no op } func (sh *ServerHandlers) Validate() { sh.ServerHandlers.Validate() - if nil == sh.RPCRegistry { - panic("RPCRegistry must be specified.") + if nil == sh.RPCServerHandler { + panic("RPCServerHandler must be specified.") } } - -func (sh *ServerHandlers) getCodec(contentType string) (protocol.ServerCodec, error) { - var codec protocol.ServerCodec - if contentType == "" && len(sh.codecs) == 1 { - // If Content-Type is not set and only one codec has been registered, - // then default to that codec. - for _, c := range sh.codecs { - codec = c - } - } else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil { - return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) - } - - return codec, nil -} - -func (sh *ServerHandlers) invoke(codec protocol.RegistryCodec) (result interface{}, err error) { - return sh.RPCRegistry.Invoke(codec) -}