This commit is contained in:
crusader 2017-11-08 15:35:27 +09:00
parent d466ca204b
commit b284554280
24 changed files with 437 additions and 278 deletions

View File

@ -1,19 +1,24 @@
package fasthttp
import (
"bytes"
"fmt"
"io"
"strings"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/server"
"github.com/valyala/fasthttp"
)
type FastHTTPAdapter struct {
registry rpc.Registry
RPCServerHandler server.ServerHandler
}
func New(rpcServerHandler server.ServerHandler) *FastHTTPAdapter {
return &FastHTTPAdapter{
RPCServerHandler: rpcServerHandler,
}
}
// FastHTTPHandler
func (a *FastHTTPAdapter) FastHTTPHandler(ctx *fasthttp.RequestCtx) {
if !ctx.IsPost() {
writeError(ctx, 405, "rpc: POST method required, received "+string(ctx.Method()))
@ -26,21 +31,18 @@ func (a *FastHTTPAdapter) FastHTTPHandler(ctx *fasthttp.RequestCtx) {
contentType = contentType[:idx]
}
// err := a.registry.Invoke(contentType, ctx.Request., ctx, beforeWrite, afterWrite)
codec, err := a.RPCServerHandler.GetCodec(contentType)
if nil != err {
writeError(ctx, 415, "rpc: Unsupported Media Type "+contentType)
return
}
// if nil != err {
// writeError(ctx, 400, err.Error())
// }
r := bytes.NewReader(ctx.PostBody())
}
func beforeWrite(w io.Writer) {
ctx := w.(*fasthttp.RequestCtx)
ctx.Response.Header.Set("x-content-type-options", "nosniff")
ctx.SetContentType("application/json; charset=utf-8")
}
func afterWrite(w io.Writer) {
if err := server.Handle(a.RPCServerHandler, codec, r, ctx); nil != err {
writeError(ctx, 500, err.Error())
return
}
}

View File

@ -0,0 +1,44 @@
package fasthttp
import (
"io"
"github.com/valyala/fasthttp"
"git.loafle.net/commons_go/rpc/server"
)
type ServerHandlers struct {
server.ServerHandlers
}
func (sh *ServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
ctx := w.(*fasthttp.RequestCtx)
ctx.Response.Header.Set("x-content-type-options", "nosniff")
ctx.SetContentType("application/json; charset=utf-8")
}
func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}

View File

@ -2,20 +2,20 @@ package http
import (
"fmt"
"io"
"net/http"
"strings"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/server"
)
type HTTPAdapter struct {
registry rpc.Registry
http.Handler
RPCServerHandler server.ServerHandler
}
func NewAdapter(registry rpc.Registry) *HTTPAdapter {
func New(rpcServerHandler server.ServerHandler) *HTTPAdapter {
return &HTTPAdapter{
registry: registry,
RPCServerHandler: rpcServerHandler,
}
}
@ -32,22 +32,16 @@ func (a *HTTPAdapter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
contentType = contentType[:idx]
}
err := a.registry.Invoke(contentType, r.Body, w, beforeWrite, afterWrite)
r.Body.Close()
codec, err := a.RPCServerHandler.GetCodec(contentType)
if nil != err {
writeError(w, 400, err.Error())
writeError(w, 415, "rpc: Unsupported Media Type "+contentType)
return
}
}
func beforeWrite(w io.Writer) {
writer := w.(http.ResponseWriter)
writer.Header().Set("x-content-type-options", "nosniff")
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
}
func afterWrite(w io.Writer) {
if err := server.Handle(a.RPCServerHandler, codec, r.Body, w); nil != err {
writeError(w, 500, err.Error())
return
}
}

View File

@ -0,0 +1,43 @@
package http
import (
"io"
"net/http"
"git.loafle.net/commons_go/rpc/server"
)
type ServerHandlers struct {
server.ServerHandlers
}
func (sh *ServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
writer := w.(http.ResponseWriter)
writer.Header().Set("x-content-type-options", "nosniff")
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
}
func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}

View File

@ -0,0 +1,50 @@
package server
import (
"log"
"net"
rpcServer "git.loafle.net/commons_go/rpc/server"
"git.loafle.net/commons_go/server"
)
type ServerHandlers struct {
server.ServerHandlers
RPCServerHandler rpcServer.ServerHandler
}
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
contentType := "json"
codec, err := sh.RPCServerHandler.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
for {
if err := rpcServer.Handle(sh.RPCServerHandler, codec, conn, conn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
select {
case <-stopChan:
return
default:
}
}
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
if nil == sh.RPCServerHandler {
panic("RPCServerHandler must be specified.")
}
}

View File

@ -0,0 +1,82 @@
package fasthttp
import (
"io"
"log"
"github.com/valyala/fasthttp"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/commons_go/websocket_fasthttp/websocket"
rpcServer "git.loafle.net/commons_go/rpc/server"
"git.loafle.net/commons_go/server"
)
type SocketHandlers struct {
cwf.SocketHandlers
RPCServerHandler rpcServer.ServerHandler
}
func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (bool, *fasthttp.ResponseHeader) {
return true, nil
}
func (sh *SocketHandlers) Handle(conn *websocket.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
contentType := "json"
codec, err := sh.RPCServerHandler.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var messageType int
var r io.Reader
var w io.WriteCloser
// conn.SetReadLimit(maxMessageSize)
// conn.SetReadDeadline(time.Now().Add(pongWait))
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if messageType, r, err = conn.NextReader(); nil != err {
doneChan <- struct{}{}
return
}
if w, err = conn.NextWriter(messageType); nil != err {
doneChan <- struct{}{}
return
}
if err = rpcServer.Handle(sh.RPCServerHandler, codec, r, w); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = w.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}
}
func (sh *SocketHandlers) Validate() {
sh.SocketHandlers.Validate()
if nil == sh.RPCServerHandler {
panic("RPCServerHandler must be specified.")
}
}

View File

@ -1,11 +0,0 @@
package gateway
import (
"git.loafle.net/commons_go/server"
)
func New(sh ServerHandler) server.Server {
s := server.New(sh)
return s
}

18
gateway/handle.go Normal file
View File

@ -0,0 +1,18 @@
package gateway
import (
"io"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error {
return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) {
var params []byte
if params, err = codecReq.Params(); nil != err {
return nil, err
}
return sh.Invoke(codecReq.Method(), params)
})
}

View File

@ -1,11 +0,0 @@
package gateway
import (
"git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/rpc/server"
)
type RPCGetewayHandler interface {
server.RPCServerHandler
Handle(codecReq protocol.ServerCodecRequest) (interface{}, error)
}

View File

@ -1,20 +0,0 @@
package gateway
import (
"errors"
"git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/rpc/server"
)
type RPCGetewayHandlers struct {
server.RPCServerHandlers
}
func (rpcGH *RPCGetewayHandlers) Handle(codecReq protocol.ServerCodecRequest) (result interface{}, err error) {
return nil, errors.New("RPC Gateway: Handler method[Handle] is not implement")
}
func (rpcGH *RPCGetewayHandlers) Validate() {
rpcGH.RPCServerHandlers.Validate()
}

11
gateway/server_handler.go Normal file
View File

@ -0,0 +1,11 @@
package gateway
import (
"git.loafle.net/commons_go/rpc"
)
type ServerHandler interface {
rpc.ServerHandler
Invoke(method string, params []byte) (result interface{}, err error)
}

View File

@ -1,67 +1,20 @@
package gateway
import (
"log"
"net"
"errors"
"git.loafle.net/commons_go/server"
"git.loafle.net/commons_go/rpc"
)
type ServerHandlers struct {
server.ServerHandlers
RPCGetewayHandler RPCGetewayHandler
rpc.ServerHandlers
}
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
contentType := sh.RPCGetewayHandler.GetContentType(conn)
codec, err := sh.RPCGetewayHandler.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
for {
sh.RPCGetewayHandler.OnPreRead(conn)
// Create a new codec request.
codecReq, errNew := codec.NewRequest(conn)
if nil != errNew {
if sh.IsClientDisconnect(errNew) {
doneChan <- struct{}{}
return
}
log.Printf("RPC Handle: %v", errNew)
doneChan <- struct{}{}
return
}
sh.RPCGetewayHandler.OnPostRead(conn)
result, err := sh.RPCGetewayHandler.Handle(codecReq)
if nil != err {
sh.RPCGetewayHandler.OnPreWriteError(conn, err)
codecReq.WriteError(conn, 400, err)
sh.RPCGetewayHandler.OnPostWriteError(conn, err)
} else {
sh.RPCGetewayHandler.OnPreWriteResult(conn, result)
codecReq.WriteResponse(conn, result)
sh.RPCGetewayHandler.OnPostWriteResult(conn, result)
}
select {
case <-stopChan:
return
default:
}
}
func (sh *ServerHandlers) Invoke(method string, params []byte) (result interface{}, err error) {
return nil, errors.New("Server: Handler method[Invoke] of Server is not implement")
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
if nil == sh.RPCGetewayHandler {
panic("RPCGetewayHandler must be specified.")
}
}

View File

@ -4,3 +4,4 @@ import:
version: v20160617
- package: git.loafle.net/commons_go/server
- package: gopkg.in/natefinch/npipe.v2
- package: git.loafle.net/commons_go/websocket_fasthttp

39
handle.go Normal file
View File

@ -0,0 +1,39 @@
package rpc
import (
"io"
"git.loafle.net/commons_go/rpc/protocol"
)
type Invoker func(codecReq protocol.ServerCodecRequest) (result interface{}, err error)
func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer, invoker Invoker) error {
sh.OnPreRead(r)
// Create a new codec request.
codecReq, errNew := codec.NewRequest(r)
defer func() {
if nil != codecReq {
codecReq.Complete()
}
}()
if nil != errNew {
return errNew
}
sh.OnPostRead(r)
result, err := invoker(codecReq)
if nil != err {
sh.OnPreWriteError(w, err)
codecReq.WriteError(w, 400, err)
sh.OnPostWriteError(w, err)
} else {
sh.OnPreWriteResult(w, result)
codecReq.WriteResponse(w, result)
sh.OnPostWriteResult(w, result)
}
return nil
}

View File

@ -62,6 +62,13 @@ func (ccn *ClientCodecNotify) ReadParams(args []interface{}) error {
return ccn.err
}
func (ccn *ClientCodecNotify) Params() ([]byte, error) {
if ccn.err == nil && ccn.notify.Params != nil {
return *ccn.notify.Params, nil
}
return nil, ccn.err
}
func (ccn *ClientCodecNotify) Complete() {
releaseClientCodecNotify(ccn)
}

View File

@ -202,6 +202,13 @@ func (scr *ServerCodecRequest) ReadParams(args []interface{}) error {
return scr.err
}
func (scr *ServerCodecRequest) Params() ([]byte, error) {
if scr.err == nil && scr.request.Params != nil {
return *scr.request.Params, nil
}
return nil, scr.err
}
// WriteResponse encodes the response and writes it to the ResponseWriter.
func (scr *ServerCodecRequest) WriteResponse(w io.Writer, reply interface{}) error {
res := retainServerResponse(Version, reply, nil, scr.request.ID)

View File

@ -9,5 +9,6 @@ type RegistryCodec interface {
Method() string
// Reads the request filling the RPC method args.
ReadParams(args []interface{}) error
Params() ([]byte, error)
Complete()
}

14
server/handle.go Normal file
View File

@ -0,0 +1,14 @@
package server
import (
"io"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error {
return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) {
return sh.Invoke(codecReq)
})
}

View File

@ -1,80 +0,0 @@
package server
import (
"fmt"
"io"
"strings"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
type RPCServerHandlers struct {
RPCRegistry rpc.Registry
codecs map[string]protocol.ServerCodec
}
// RegisterCodec adds a new codec to the server.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
func (rpcSH *RPCServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) {
if nil == rpcSH.codecs {
rpcSH.codecs = make(map[string]protocol.ServerCodec)
}
rpcSH.codecs[strings.ToLower(contentType)] = codec
}
func (rpcSH *RPCServerHandlers) GetContentType(r io.Reader) string {
return ""
}
func (rpcSH *RPCServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (rpcSH *RPCServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (rpcSH *RPCServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
// no op
}
func (rpcSH *RPCServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (rpcSH *RPCServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (rpcSH *RPCServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (rpcSH *RPCServerHandlers) Validate() {
if nil == rpcSH.RPCRegistry {
panic("RPCRegistry must be specified.")
}
}
func (rpcSH *RPCServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) {
var codec protocol.ServerCodec
if contentType == "" && len(rpcSH.codecs) == 1 {
// If Content-Type is not set and only one codec has been registered,
// then default to that codec.
for _, c := range rpcSH.codecs {
codec = c
}
} else if codec = rpcSH.codecs[strings.ToLower(contentType)]; codec == nil {
return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType)
}
return codec, nil
}
func (rpcSH *RPCServerHandlers) invoke(codec protocol.RegistryCodec) (result interface{}, err error) {
return rpcSH.RPCRegistry.Invoke(codec)
}

View File

@ -1,11 +0,0 @@
package server
import (
"git.loafle.net/commons_go/server"
)
func New(sh ServerHandler) server.Server {
s := server.New(sh)
return s
}

View File

@ -1,7 +1,12 @@
package server
import "git.loafle.net/commons_go/server"
import (
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServerHandler interface {
server.ServerHandler
rpc.ServerHandler
Invoke(codec protocol.RegistryCodec) (result interface{}, err error)
}

View File

@ -1,67 +1,23 @@
package server
import (
"log"
"net"
"git.loafle.net/commons_go/server"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServerHandlers struct {
server.ServerHandlers
RPCServerHandler RPCServerHandler
rpc.ServerHandlers
RPCRegistry rpc.Registry
}
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
contentType := sh.RPCServerHandler.GetContentType(conn)
codec, err := sh.RPCServerHandler.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
for {
sh.RPCServerHandler.OnPreRead(conn)
// Create a new codec request.
codecReq, errNew := codec.NewRequest(conn)
if nil != errNew {
if sh.IsClientDisconnect(errNew) {
doneChan <- struct{}{}
return
}
log.Printf("RPC Handle: %v", errNew)
doneChan <- struct{}{}
return
}
sh.RPCServerHandler.OnPostRead(conn)
result, err := sh.RPCServerHandler.invoke(codecReq)
if nil != err {
sh.RPCServerHandler.OnPreWriteError(conn, err)
codecReq.WriteError(conn, 400, err)
sh.RPCServerHandler.OnPostWriteError(conn, err)
} else {
sh.RPCServerHandler.OnPreWriteResult(conn, result)
codecReq.WriteResponse(conn, result)
sh.RPCServerHandler.OnPostWriteResult(conn, result)
}
select {
case <-stopChan:
return
default:
}
}
func (sh *ServerHandlers) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) {
return sh.RPCRegistry.Invoke(codec)
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
if nil == sh.RPCServerHandler {
panic("RPCServerHandler must be specified.")
if nil == sh.RPCRegistry {
panic("RPCRegistry must be specified.")
}
}

View File

@ -1,4 +1,4 @@
package server
package rpc
import (
"io"
@ -6,8 +6,7 @@ import (
"git.loafle.net/commons_go/rpc/protocol"
)
type RPCServerHandler interface {
GetContentType(r io.Reader) string
type ServerHandler interface {
RegisterCodec(codec protocol.ServerCodec, contentType string)
OnPreRead(r io.Reader)
@ -20,5 +19,4 @@ type RPCServerHandler interface {
OnPostWriteError(w io.Writer, err error)
GetCodec(contentType string) (protocol.ServerCodec, error)
invoke(codec protocol.RegistryCodec) (result interface{}, err error)
}

67
server_handlers.go Normal file
View File

@ -0,0 +1,67 @@
package rpc
import (
"fmt"
"io"
"strings"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServerHandlers struct {
codecs map[string]protocol.ServerCodec
}
// RegisterCodec adds a new codec to the server.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
func (sh *ServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) {
if nil == sh.codecs {
sh.codecs = make(map[string]protocol.ServerCodec)
}
sh.codecs[strings.ToLower(contentType)] = codec
}
func (sh *ServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) Validate() {
}
func (sh *ServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) {
var codec protocol.ServerCodec
if contentType == "" && len(sh.codecs) == 1 {
// If Content-Type is not set and only one codec has been registered,
// then default to that codec.
for _, c := range sh.codecs {
codec = c
}
} else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil {
return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType)
}
return codec, nil
}