diff --git a/adapter/fasthttp/adapter.go b/adapter/fasthttp/adapter.go index ebb590f..d0f2d2f 100644 --- a/adapter/fasthttp/adapter.go +++ b/adapter/fasthttp/adapter.go @@ -1,19 +1,24 @@ package fasthttp import ( + "bytes" "fmt" - "io" "strings" - "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/server" "github.com/valyala/fasthttp" ) type FastHTTPAdapter struct { - registry rpc.Registry + RPCServerHandler server.ServerHandler +} + +func New(rpcServerHandler server.ServerHandler) *FastHTTPAdapter { + return &FastHTTPAdapter{ + RPCServerHandler: rpcServerHandler, + } } -// FastHTTPHandler func (a *FastHTTPAdapter) FastHTTPHandler(ctx *fasthttp.RequestCtx) { if !ctx.IsPost() { writeError(ctx, 405, "rpc: POST method required, received "+string(ctx.Method())) @@ -26,21 +31,18 @@ func (a *FastHTTPAdapter) FastHTTPHandler(ctx *fasthttp.RequestCtx) { contentType = contentType[:idx] } - // err := a.registry.Invoke(contentType, ctx.Request., ctx, beforeWrite, afterWrite) + codec, err := a.RPCServerHandler.GetCodec(contentType) + if nil != err { + writeError(ctx, 415, "rpc: Unsupported Media Type "+contentType) + return + } - // if nil != err { - // writeError(ctx, 400, err.Error()) - // } + r := bytes.NewReader(ctx.PostBody()) -} - -func beforeWrite(w io.Writer) { - ctx := w.(*fasthttp.RequestCtx) - ctx.Response.Header.Set("x-content-type-options", "nosniff") - ctx.SetContentType("application/json; charset=utf-8") -} - -func afterWrite(w io.Writer) { + if err := server.Handle(a.RPCServerHandler, codec, r, ctx); nil != err { + writeError(ctx, 500, err.Error()) + return + } } diff --git a/adapter/fasthttp/server_handlers.go b/adapter/fasthttp/server_handlers.go new file mode 100644 index 0000000..83f1ffc --- /dev/null +++ b/adapter/fasthttp/server_handlers.go @@ -0,0 +1,44 @@ +package fasthttp + +import ( + "io" + + "github.com/valyala/fasthttp" + + "git.loafle.net/commons_go/rpc/server" +) + +type ServerHandlers struct { + server.ServerHandlers +} + +func (sh *ServerHandlers) OnPreRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPostRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { + ctx := w.(*fasthttp.RequestCtx) + ctx.Response.Header.Set("x-content-type-options", "nosniff") + ctx.SetContentType("application/json; charset=utf-8") +} + +func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) { + // no op +} + +func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) { + // no op +} + +func (sh *ServerHandlers) Validate() { + sh.ServerHandlers.Validate() + +} diff --git a/adapter/http/adapter.go b/adapter/http/adapter.go index c831cc6..5c925e5 100644 --- a/adapter/http/adapter.go +++ b/adapter/http/adapter.go @@ -2,20 +2,20 @@ package http import ( "fmt" - "io" "net/http" "strings" - "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/server" ) type HTTPAdapter struct { - registry rpc.Registry + http.Handler + RPCServerHandler server.ServerHandler } -func NewAdapter(registry rpc.Registry) *HTTPAdapter { +func New(rpcServerHandler server.ServerHandler) *HTTPAdapter { return &HTTPAdapter{ - registry: registry, + RPCServerHandler: rpcServerHandler, } } @@ -32,22 +32,16 @@ func (a *HTTPAdapter) ServeHTTP(w http.ResponseWriter, r *http.Request) { contentType = contentType[:idx] } - err := a.registry.Invoke(contentType, r.Body, w, beforeWrite, afterWrite) - r.Body.Close() - + codec, err := a.RPCServerHandler.GetCodec(contentType) if nil != err { - writeError(w, 400, err.Error()) + writeError(w, 415, "rpc: Unsupported Media Type "+contentType) + return } -} - -func beforeWrite(w io.Writer) { - writer := w.(http.ResponseWriter) - writer.Header().Set("x-content-type-options", "nosniff") - writer.Header().Set("Content-Type", "application/json; charset=utf-8") -} - -func afterWrite(w io.Writer) { + if err := server.Handle(a.RPCServerHandler, codec, r.Body, w); nil != err { + writeError(w, 500, err.Error()) + return + } } diff --git a/adapter/http/server_handlers.go b/adapter/http/server_handlers.go new file mode 100644 index 0000000..b2efd62 --- /dev/null +++ b/adapter/http/server_handlers.go @@ -0,0 +1,43 @@ +package http + +import ( + "io" + "net/http" + + "git.loafle.net/commons_go/rpc/server" +) + +type ServerHandlers struct { + server.ServerHandlers +} + +func (sh *ServerHandlers) OnPreRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPostRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { + writer := w.(http.ResponseWriter) + writer.Header().Set("x-content-type-options", "nosniff") + writer.Header().Set("Content-Type", "application/json; charset=utf-8") +} + +func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) { + // no op +} + +func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) { + // no op +} + +func (sh *ServerHandlers) Validate() { + sh.ServerHandlers.Validate() + +} diff --git a/adapter/server/server_handlers.go b/adapter/server/server_handlers.go new file mode 100644 index 0000000..912598d --- /dev/null +++ b/adapter/server/server_handlers.go @@ -0,0 +1,50 @@ +package server + +import ( + "log" + "net" + + rpcServer "git.loafle.net/commons_go/rpc/server" + "git.loafle.net/commons_go/server" +) + +type ServerHandlers struct { + server.ServerHandlers + + RPCServerHandler rpcServer.ServerHandler +} + +func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { + contentType := "json" + codec, err := sh.RPCServerHandler.GetCodec(contentType) + if nil != err { + log.Printf("RPC Handle: %v", err) + doneChan <- struct{}{} + return + } + + for { + if err := rpcServer.Handle(sh.RPCServerHandler, codec, conn, conn); nil != err { + if server.IsClientDisconnect(err) { + doneChan <- struct{}{} + return + } + log.Printf("RPC: %v", err) + } + + select { + case <-stopChan: + return + default: + } + } + +} + +func (sh *ServerHandlers) Validate() { + sh.ServerHandlers.Validate() + + if nil == sh.RPCServerHandler { + panic("RPCServerHandler must be specified.") + } +} diff --git a/adapter/websocket/fasthttp/websocket_fasthttp_socket_handlers.go b/adapter/websocket/fasthttp/websocket_fasthttp_socket_handlers.go new file mode 100644 index 0000000..6f4d84f --- /dev/null +++ b/adapter/websocket/fasthttp/websocket_fasthttp_socket_handlers.go @@ -0,0 +1,82 @@ +package fasthttp + +import ( + "io" + "log" + + "github.com/valyala/fasthttp" + + cwf "git.loafle.net/commons_go/websocket_fasthttp" + "git.loafle.net/commons_go/websocket_fasthttp/websocket" + + rpcServer "git.loafle.net/commons_go/rpc/server" + "git.loafle.net/commons_go/server" +) + +type SocketHandlers struct { + cwf.SocketHandlers + + RPCServerHandler rpcServer.ServerHandler +} + +func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (bool, *fasthttp.ResponseHeader) { + return true, nil +} + +func (sh *SocketHandlers) Handle(conn *websocket.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { + contentType := "json" + codec, err := sh.RPCServerHandler.GetCodec(contentType) + if nil != err { + log.Printf("RPC Handle: %v", err) + doneChan <- struct{}{} + return + } + + var messageType int + var r io.Reader + var w io.WriteCloser + + // conn.SetReadLimit(maxMessageSize) + // conn.SetReadDeadline(time.Now().Add(pongWait)) + // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + + for { + if messageType, r, err = conn.NextReader(); nil != err { + doneChan <- struct{}{} + return + } + + if w, err = conn.NextWriter(messageType); nil != err { + doneChan <- struct{}{} + return + } + + if err = rpcServer.Handle(sh.RPCServerHandler, codec, r, w); nil != err { + if server.IsClientDisconnect(err) { + doneChan <- struct{}{} + return + } + log.Printf("RPC: %v", err) + } + + if err = w.Close(); nil != err { + doneChan <- struct{}{} + return + } + + select { + case <-stopChan: + return + default: + } + } + +} + +func (sh *SocketHandlers) Validate() { + sh.SocketHandlers.Validate() + + if nil == sh.RPCServerHandler { + panic("RPCServerHandler must be specified.") + } +} diff --git a/gateway/gateway.go b/gateway/gateway.go deleted file mode 100644 index 50715f9..0000000 --- a/gateway/gateway.go +++ /dev/null @@ -1,11 +0,0 @@ -package gateway - -import ( - "git.loafle.net/commons_go/server" -) - -func New(sh ServerHandler) server.Server { - s := server.New(sh) - - return s -} diff --git a/gateway/handle.go b/gateway/handle.go new file mode 100644 index 0000000..5312efd --- /dev/null +++ b/gateway/handle.go @@ -0,0 +1,18 @@ +package gateway + +import ( + "io" + + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol" +) + +func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error { + return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) { + var params []byte + if params, err = codecReq.Params(); nil != err { + return nil, err + } + return sh.Invoke(codecReq.Method(), params) + }) +} diff --git a/gateway/rpc_gateway_handler.go b/gateway/rpc_gateway_handler.go deleted file mode 100644 index 3e82d82..0000000 --- a/gateway/rpc_gateway_handler.go +++ /dev/null @@ -1,11 +0,0 @@ -package gateway - -import ( - "git.loafle.net/commons_go/rpc/protocol" - "git.loafle.net/commons_go/rpc/server" -) - -type RPCGetewayHandler interface { - server.RPCServerHandler - Handle(codecReq protocol.ServerCodecRequest) (interface{}, error) -} diff --git a/gateway/rpc_gateway_handlers.go b/gateway/rpc_gateway_handlers.go deleted file mode 100644 index 267f26f..0000000 --- a/gateway/rpc_gateway_handlers.go +++ /dev/null @@ -1,20 +0,0 @@ -package gateway - -import ( - "errors" - - "git.loafle.net/commons_go/rpc/protocol" - "git.loafle.net/commons_go/rpc/server" -) - -type RPCGetewayHandlers struct { - server.RPCServerHandlers -} - -func (rpcGH *RPCGetewayHandlers) Handle(codecReq protocol.ServerCodecRequest) (result interface{}, err error) { - return nil, errors.New("RPC Gateway: Handler method[Handle] is not implement") -} - -func (rpcGH *RPCGetewayHandlers) Validate() { - rpcGH.RPCServerHandlers.Validate() -} diff --git a/gateway/server_handler.go b/gateway/server_handler.go new file mode 100644 index 0000000..890ff70 --- /dev/null +++ b/gateway/server_handler.go @@ -0,0 +1,11 @@ +package gateway + +import ( + "git.loafle.net/commons_go/rpc" +) + +type ServerHandler interface { + rpc.ServerHandler + + Invoke(method string, params []byte) (result interface{}, err error) +} diff --git a/gateway/server_handlers.go b/gateway/server_handlers.go index c751185..e695155 100644 --- a/gateway/server_handlers.go +++ b/gateway/server_handlers.go @@ -1,67 +1,20 @@ package gateway import ( - "log" - "net" + "errors" - "git.loafle.net/commons_go/server" + "git.loafle.net/commons_go/rpc" ) type ServerHandlers struct { - server.ServerHandlers - - RPCGetewayHandler RPCGetewayHandler + rpc.ServerHandlers } -func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { - contentType := sh.RPCGetewayHandler.GetContentType(conn) - codec, err := sh.RPCGetewayHandler.GetCodec(contentType) - if nil != err { - log.Printf("RPC Handle: %v", err) - doneChan <- struct{}{} - return - } - - for { - sh.RPCGetewayHandler.OnPreRead(conn) - // Create a new codec request. - codecReq, errNew := codec.NewRequest(conn) - if nil != errNew { - if sh.IsClientDisconnect(errNew) { - doneChan <- struct{}{} - return - } - log.Printf("RPC Handle: %v", errNew) - doneChan <- struct{}{} - return - } - sh.RPCGetewayHandler.OnPostRead(conn) - - result, err := sh.RPCGetewayHandler.Handle(codecReq) - - if nil != err { - sh.RPCGetewayHandler.OnPreWriteError(conn, err) - codecReq.WriteError(conn, 400, err) - sh.RPCGetewayHandler.OnPostWriteError(conn, err) - } else { - sh.RPCGetewayHandler.OnPreWriteResult(conn, result) - codecReq.WriteResponse(conn, result) - sh.RPCGetewayHandler.OnPostWriteResult(conn, result) - } - - select { - case <-stopChan: - return - default: - } - } +func (sh *ServerHandlers) Invoke(method string, params []byte) (result interface{}, err error) { + return nil, errors.New("Server: Handler method[Invoke] of Server is not implement") } func (sh *ServerHandlers) Validate() { sh.ServerHandlers.Validate() - - if nil == sh.RPCGetewayHandler { - panic("RPCGetewayHandler must be specified.") - } } diff --git a/glide.yaml b/glide.yaml index d4f5bff..d9d0fd3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -4,3 +4,4 @@ import: version: v20160617 - package: git.loafle.net/commons_go/server - package: gopkg.in/natefinch/npipe.v2 +- package: git.loafle.net/commons_go/websocket_fasthttp diff --git a/handle.go b/handle.go new file mode 100644 index 0000000..d877b9e --- /dev/null +++ b/handle.go @@ -0,0 +1,39 @@ +package rpc + +import ( + "io" + + "git.loafle.net/commons_go/rpc/protocol" +) + +type Invoker func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) + +func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer, invoker Invoker) error { + sh.OnPreRead(r) + // Create a new codec request. + codecReq, errNew := codec.NewRequest(r) + defer func() { + if nil != codecReq { + codecReq.Complete() + } + }() + + if nil != errNew { + return errNew + } + sh.OnPostRead(r) + + result, err := invoker(codecReq) + + if nil != err { + sh.OnPreWriteError(w, err) + codecReq.WriteError(w, 400, err) + sh.OnPostWriteError(w, err) + } else { + sh.OnPreWriteResult(w, result) + codecReq.WriteResponse(w, result) + sh.OnPostWriteResult(w, result) + } + + return nil +} diff --git a/protocol/json/client_notify.go b/protocol/json/client_notify.go index 0a90db3..1f40d16 100644 --- a/protocol/json/client_notify.go +++ b/protocol/json/client_notify.go @@ -62,6 +62,13 @@ func (ccn *ClientCodecNotify) ReadParams(args []interface{}) error { return ccn.err } +func (ccn *ClientCodecNotify) Params() ([]byte, error) { + if ccn.err == nil && ccn.notify.Params != nil { + return *ccn.notify.Params, nil + } + return nil, ccn.err +} + func (ccn *ClientCodecNotify) Complete() { releaseClientCodecNotify(ccn) } diff --git a/protocol/json/server.go b/protocol/json/server.go index 8446066..7ad57c2 100644 --- a/protocol/json/server.go +++ b/protocol/json/server.go @@ -202,6 +202,13 @@ func (scr *ServerCodecRequest) ReadParams(args []interface{}) error { return scr.err } +func (scr *ServerCodecRequest) Params() ([]byte, error) { + if scr.err == nil && scr.request.Params != nil { + return *scr.request.Params, nil + } + return nil, scr.err +} + // WriteResponse encodes the response and writes it to the ResponseWriter. func (scr *ServerCodecRequest) WriteResponse(w io.Writer, reply interface{}) error { res := retainServerResponse(Version, reply, nil, scr.request.ID) diff --git a/protocol/registry_codec.go b/protocol/registry_codec.go index 9279fb7..421cd21 100644 --- a/protocol/registry_codec.go +++ b/protocol/registry_codec.go @@ -9,5 +9,6 @@ type RegistryCodec interface { Method() string // Reads the request filling the RPC method args. ReadParams(args []interface{}) error + Params() ([]byte, error) Complete() } diff --git a/server/handle.go b/server/handle.go new file mode 100644 index 0000000..9407471 --- /dev/null +++ b/server/handle.go @@ -0,0 +1,14 @@ +package server + +import ( + "io" + + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol" +) + +func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error { + return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) { + return sh.Invoke(codecReq) + }) +} diff --git a/server/rpc_server_handlers.go b/server/rpc_server_handlers.go deleted file mode 100644 index cb6a3ea..0000000 --- a/server/rpc_server_handlers.go +++ /dev/null @@ -1,80 +0,0 @@ -package server - -import ( - "fmt" - "io" - "strings" - - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol" -) - -type RPCServerHandlers struct { - RPCRegistry rpc.Registry - codecs map[string]protocol.ServerCodec -} - -// RegisterCodec adds a new codec to the server. -// -// Codecs are defined to process a given serialization scheme, e.g., JSON or -// XML. A codec is chosen based on the "Content-Type" header from the request, -// excluding the charset definition. -func (rpcSH *RPCServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) { - if nil == rpcSH.codecs { - rpcSH.codecs = make(map[string]protocol.ServerCodec) - } - rpcSH.codecs[strings.ToLower(contentType)] = codec -} - -func (rpcSH *RPCServerHandlers) GetContentType(r io.Reader) string { - return "" -} - -func (rpcSH *RPCServerHandlers) OnPreRead(r io.Reader) { - // no op -} - -func (rpcSH *RPCServerHandlers) OnPostRead(r io.Reader) { - // no op -} - -func (rpcSH *RPCServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (rpcSH *RPCServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (rpcSH *RPCServerHandlers) OnPreWriteError(w io.Writer, err error) { - // no op -} - -func (rpcSH *RPCServerHandlers) OnPostWriteError(w io.Writer, err error) { - // no op -} - -func (rpcSH *RPCServerHandlers) Validate() { - if nil == rpcSH.RPCRegistry { - panic("RPCRegistry must be specified.") - } -} - -func (rpcSH *RPCServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) { - var codec protocol.ServerCodec - if contentType == "" && len(rpcSH.codecs) == 1 { - // If Content-Type is not set and only one codec has been registered, - // then default to that codec. - for _, c := range rpcSH.codecs { - codec = c - } - } else if codec = rpcSH.codecs[strings.ToLower(contentType)]; codec == nil { - return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) - } - - return codec, nil -} - -func (rpcSH *RPCServerHandlers) invoke(codec protocol.RegistryCodec) (result interface{}, err error) { - return rpcSH.RPCRegistry.Invoke(codec) -} diff --git a/server/server.go b/server/server.go deleted file mode 100644 index e9ff5fb..0000000 --- a/server/server.go +++ /dev/null @@ -1,11 +0,0 @@ -package server - -import ( - "git.loafle.net/commons_go/server" -) - -func New(sh ServerHandler) server.Server { - s := server.New(sh) - - return s -} diff --git a/server/server_handler.go b/server/server_handler.go index cf02713..ba3d4bb 100644 --- a/server/server_handler.go +++ b/server/server_handler.go @@ -1,7 +1,12 @@ package server -import "git.loafle.net/commons_go/server" +import ( + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol" +) type ServerHandler interface { - server.ServerHandler + rpc.ServerHandler + + Invoke(codec protocol.RegistryCodec) (result interface{}, err error) } diff --git a/server/server_handlers.go b/server/server_handlers.go index 69e7565..0e010af 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -1,67 +1,23 @@ package server import ( - "log" - "net" - - "git.loafle.net/commons_go/server" + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol" ) type ServerHandlers struct { - server.ServerHandlers - - RPCServerHandler RPCServerHandler + rpc.ServerHandlers + RPCRegistry rpc.Registry } -func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { - contentType := sh.RPCServerHandler.GetContentType(conn) - codec, err := sh.RPCServerHandler.GetCodec(contentType) - if nil != err { - log.Printf("RPC Handle: %v", err) - doneChan <- struct{}{} - return - } - - for { - sh.RPCServerHandler.OnPreRead(conn) - // Create a new codec request. - codecReq, errNew := codec.NewRequest(conn) - if nil != errNew { - if sh.IsClientDisconnect(errNew) { - doneChan <- struct{}{} - return - } - log.Printf("RPC Handle: %v", errNew) - doneChan <- struct{}{} - return - } - sh.RPCServerHandler.OnPostRead(conn) - - result, err := sh.RPCServerHandler.invoke(codecReq) - - if nil != err { - sh.RPCServerHandler.OnPreWriteError(conn, err) - codecReq.WriteError(conn, 400, err) - sh.RPCServerHandler.OnPostWriteError(conn, err) - } else { - sh.RPCServerHandler.OnPreWriteResult(conn, result) - codecReq.WriteResponse(conn, result) - sh.RPCServerHandler.OnPostWriteResult(conn, result) - } - - select { - case <-stopChan: - return - default: - } - } - +func (sh *ServerHandlers) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) { + return sh.RPCRegistry.Invoke(codec) } func (sh *ServerHandlers) Validate() { sh.ServerHandlers.Validate() - if nil == sh.RPCServerHandler { - panic("RPCServerHandler must be specified.") + if nil == sh.RPCRegistry { + panic("RPCRegistry must be specified.") } } diff --git a/server/rpc_server_handler.go b/server_handler.go similarity index 73% rename from server/rpc_server_handler.go rename to server_handler.go index b0026d9..855ea7e 100644 --- a/server/rpc_server_handler.go +++ b/server_handler.go @@ -1,4 +1,4 @@ -package server +package rpc import ( "io" @@ -6,8 +6,7 @@ import ( "git.loafle.net/commons_go/rpc/protocol" ) -type RPCServerHandler interface { - GetContentType(r io.Reader) string +type ServerHandler interface { RegisterCodec(codec protocol.ServerCodec, contentType string) OnPreRead(r io.Reader) @@ -20,5 +19,4 @@ type RPCServerHandler interface { OnPostWriteError(w io.Writer, err error) GetCodec(contentType string) (protocol.ServerCodec, error) - invoke(codec protocol.RegistryCodec) (result interface{}, err error) } diff --git a/server_handlers.go b/server_handlers.go new file mode 100644 index 0000000..4a6c506 --- /dev/null +++ b/server_handlers.go @@ -0,0 +1,67 @@ +package rpc + +import ( + "fmt" + "io" + "strings" + + "git.loafle.net/commons_go/rpc/protocol" +) + +type ServerHandlers struct { + codecs map[string]protocol.ServerCodec +} + +// RegisterCodec adds a new codec to the server. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (sh *ServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) { + if nil == sh.codecs { + sh.codecs = make(map[string]protocol.ServerCodec) + } + sh.codecs[strings.ToLower(contentType)] = codec +} + +func (sh *ServerHandlers) OnPreRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPostRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) { + // no op +} + +func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) { + // no op +} + +func (sh *ServerHandlers) Validate() { +} + +func (sh *ServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) { + var codec protocol.ServerCodec + if contentType == "" && len(sh.codecs) == 1 { + // If Content-Type is not set and only one codec has been registered, + // then default to that codec. + for _, c := range sh.codecs { + codec = c + } + } else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil { + return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) + } + + return codec, nil +}