ing
This commit is contained in:
parent
942a854fe1
commit
6aa5c1536d
|
@ -303,7 +303,7 @@ func (c *client) rpcReader(readerDone chan<- error) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
msg, err := c.ch.GetCodec().NewMessage(c.conn)
|
resCodec, err := c.ch.GetCodec().NewResponse(c.conn)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
||||||
logging.Logger().Info("Client: disconnected from server")
|
logging.Logger().Info("Client: disconnected from server")
|
||||||
|
@ -314,12 +314,10 @@ func (c *client) rpcReader(readerDone chan<- error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
switch msg.MessageType() {
|
if nil != resCodec.ID() {
|
||||||
case protocol.MessageTypeResponse:
|
err = c.handleResponse(resCodec)
|
||||||
c.handleResponse(msg)
|
} else {
|
||||||
case protocol.MessageTypeNotification:
|
err = c.handleNotification(resCodec)
|
||||||
c.handleNotification(msg)
|
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if nil != err {
|
if nil != err {
|
||||||
|
@ -330,14 +328,7 @@ func (c *client) rpcReader(readerDone chan<- error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *client) handleResponse(msg protocol.ClientMessageCodec) error {
|
func (c *client) handleResponse(resCodec protocol.ClientResponseCodec) error {
|
||||||
codec, err := msg.MessageCodec()
|
|
||||||
if nil != err {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
resCodec := codec.(protocol.ClientResponseCodec)
|
|
||||||
|
|
||||||
c.pendingRequestsLock.Lock()
|
c.pendingRequestsLock.Lock()
|
||||||
id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint()
|
id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint()
|
||||||
|
|
||||||
|
@ -369,14 +360,12 @@ func (c *client) handleResponse(msg protocol.ClientMessageCodec) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *client) handleNotification(msg protocol.ClientMessageCodec) error {
|
func (c *client) handleNotification(resCodec protocol.ClientResponseCodec) error {
|
||||||
codec, err := msg.MessageCodec()
|
notiCodec, err := resCodec.Notification()
|
||||||
if nil != err {
|
if nil != err {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
notiCodec := codec.(protocol.ClientNotificationCodec)
|
|
||||||
|
|
||||||
_, err = c.ch.GetRPCRegistry().Invoke(notiCodec)
|
_, err = c.ch.GetRPCRegistry().Invoke(notiCodec)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -10,14 +10,14 @@ import (
|
||||||
type ServletHandlers struct {
|
type ServletHandlers struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) GetRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
|
func (sh *ServletHandlers) ReadRequest(servletCTX rpc.RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
|
||||||
nConn := conn.(net.Conn)
|
nConn := conn.(net.Conn)
|
||||||
requestCodec, err := codec.NewRequest(nConn)
|
requestCodec, err := codec.NewRequest(nConn)
|
||||||
|
|
||||||
return requestCodec, err
|
return requestCodec, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
|
func (sh *ServletHandlers) WriteResponse(servletCTX rpc.RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
|
||||||
nConn := conn.(net.Conn)
|
nConn := conn.(net.Conn)
|
||||||
|
|
||||||
if nil != err {
|
if nil != err {
|
||||||
|
@ -33,7 +33,7 @@ func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn inte
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) SendNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error {
|
func (sh *ServletHandlers) WriteNotification(servletCTX rpc.RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error {
|
||||||
nConn := conn.(net.Conn)
|
nConn := conn.(net.Conn)
|
||||||
|
|
||||||
if wErr := codec.WriteNotification(nConn, method, args); nil != wErr {
|
if wErr := codec.WriteNotification(nConn, method, args); nil != wErr {
|
||||||
|
|
|
@ -1,17 +1,16 @@
|
||||||
package fasthttp
|
package fasthttp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc"
|
"git.loafle.net/commons_go/rpc"
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
cwf "git.loafle.net/commons_go/websocket_fasthttp"
|
cwf "git.loafle.net/commons_go/websocket_fasthttp"
|
||||||
|
"git.loafle.net/commons_go/websocket_fasthttp/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServletHandlers struct {
|
type ServletHandlers struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) GetRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
|
func (sh *ServletHandlers) ReadRequest(servletCTX rpc.RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
|
||||||
soc := conn.(cwf.Socket)
|
soc := conn.(cwf.Socket)
|
||||||
_, r, err := soc.NextReader()
|
_, r, err := soc.NextReader()
|
||||||
|
|
||||||
|
@ -20,7 +19,7 @@ func (sh *ServletHandlers) GetRequest(servletCTX rpc.ServletContext, codec proto
|
||||||
return requestCodec, err
|
return requestCodec, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
|
func (sh *ServletHandlers) WriteResponse(servletCTX rpc.RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
|
||||||
soc := conn.(cwf.Socket)
|
soc := conn.(cwf.Socket)
|
||||||
|
|
||||||
wc, wErr := soc.NextWriter(websocket.TextMessage)
|
wc, wErr := soc.NextWriter(websocket.TextMessage)
|
||||||
|
@ -41,7 +40,7 @@ func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn inte
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) SendNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error {
|
func (sh *ServletHandlers) WriteNotification(servletCTX rpc.RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error {
|
||||||
soc := conn.(cwf.Socket)
|
soc := conn.(cwf.Socket)
|
||||||
|
|
||||||
wc, wErr := soc.NextWriter(websocket.TextMessage)
|
wc, wErr := soc.NextWriter(websocket.TextMessage)
|
||||||
|
|
|
@ -3,7 +3,7 @@ package rpc
|
||||||
import cuc "git.loafle.net/commons_go/util/context"
|
import cuc "git.loafle.net/commons_go/util/context"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultPendingMessages = 32 * 1024
|
DefaultPendingResponses = 32 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -4,40 +4,17 @@ import (
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
MessageTypeUnknown MessageType = iota
|
|
||||||
MessageTypeRequest
|
|
||||||
MessageTypeResponse
|
|
||||||
MessageTypeNotification
|
|
||||||
)
|
|
||||||
|
|
||||||
// ClientCodec creates a ClientCodecRequest to process each request.
|
// ClientCodec creates a ClientCodecRequest to process each request.
|
||||||
type ClientCodec interface {
|
type ClientCodec interface {
|
||||||
WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error
|
WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error
|
||||||
NewMessage(rc io.Reader) (ClientMessageCodec, error)
|
NewResponse(rc io.Reader) (ClientResponseCodec, error)
|
||||||
}
|
|
||||||
|
|
||||||
type ClientRequestCodec interface {
|
|
||||||
RegistryCodec
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientMessageCodec decodes a response or notification using a specific
|
|
||||||
// serialization scheme.
|
|
||||||
type ClientMessageCodec interface {
|
|
||||||
MessageType() MessageType
|
|
||||||
// Reads the message filling the RPC response or notification.
|
|
||||||
MessageCodec() (interface{}, error)
|
|
||||||
|
|
||||||
Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientResponseCodec interface {
|
type ClientResponseCodec interface {
|
||||||
|
Notification() (ClientNotificationCodec, error)
|
||||||
Result(result interface{}) error
|
Result(result interface{}) error
|
||||||
Error() error
|
Error() error
|
||||||
ID() interface{}
|
ID() interface{}
|
||||||
Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientNotificationCodec interface {
|
type ClientNotificationCodec interface {
|
||||||
|
|
|
@ -2,21 +2,12 @@ package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/codec"
|
"git.loafle.net/commons_go/rpc/codec"
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
// clientMessage represents a JSON-RPC message sent to a client.
|
|
||||||
type clientMessage struct {
|
|
||||||
Version string `json:"jsonrpc"`
|
|
||||||
MessageType protocol.MessageType `json:"messageType"`
|
|
||||||
Message *json.RawMessage `json:"message"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Codec
|
// Codec
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
@ -37,10 +28,12 @@ type ClientCodec struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error {
|
func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error {
|
||||||
req := retainClientRequest(method, args, id)
|
req := &clientRequest{
|
||||||
defer func() {
|
Version: Version,
|
||||||
releaseClientRequest(req)
|
Method: method,
|
||||||
}()
|
Params: args,
|
||||||
|
ID: id,
|
||||||
|
}
|
||||||
|
|
||||||
encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w))
|
encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w))
|
||||||
if err := encoder.Encode(req); nil != err {
|
if err := encoder.Encode(req); nil != err {
|
||||||
|
@ -51,96 +44,6 @@ func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMessage returns a ClientMessageCodec.
|
// NewMessage returns a ClientMessageCodec.
|
||||||
func (cc *ClientCodec) NewMessage(r io.Reader) (protocol.ClientMessageCodec, error) {
|
func (cc *ClientCodec) NewResponse(r io.Reader) (protocol.ClientResponseCodec, error) {
|
||||||
return newClientMessageCodec(r, cc.codecSel.SelectByReader(r))
|
return newClientResponseCodec(r, cc.codecSel.SelectByReader(r))
|
||||||
}
|
|
||||||
|
|
||||||
// newClientMessageCodec returns a new ClientMessageCodec.
|
|
||||||
func newClientMessageCodec(r io.Reader, codec codec.Codec) (protocol.ClientMessageCodec, error) {
|
|
||||||
msg := retainClientMessage()
|
|
||||||
err := json.NewDecoder(r).Decode(msg)
|
|
||||||
if err != nil {
|
|
||||||
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = &Error{
|
|
||||||
Code: E_PARSE,
|
|
||||||
Message: err.Error(),
|
|
||||||
Data: msg,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if msg.Version != Version {
|
|
||||||
err = &Error{
|
|
||||||
Code: E_INVALID_REQ,
|
|
||||||
Message: "jsonrpc must be " + Version,
|
|
||||||
Data: msg,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return retainClientMessageCodec(msg, err, codec), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type ClientMessageCodec struct {
|
|
||||||
msg *clientMessage
|
|
||||||
err error
|
|
||||||
codec codec.Codec
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ccm *ClientMessageCodec) MessageType() protocol.MessageType {
|
|
||||||
return ccm.msg.MessageType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ccm *ClientMessageCodec) MessageCodec() (interface{}, error) {
|
|
||||||
switch ccm.msg.MessageType {
|
|
||||||
case protocol.MessageTypeResponse:
|
|
||||||
return newClientResponseCodec(ccm.msg.Message, ccm.codec)
|
|
||||||
case protocol.MessageTypeNotification:
|
|
||||||
return newClientNotificationCodec(ccm.msg.Message, ccm.codec)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("RPC: Not supported message type[%v]", ccm.msg.MessageType)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ccm *ClientMessageCodec) Close() {
|
|
||||||
if nil != ccm.msg {
|
|
||||||
releaseClientMessage(ccm.msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
releaseClientMessageCodec(ccm)
|
|
||||||
}
|
|
||||||
|
|
||||||
var clientMessagePool sync.Pool
|
|
||||||
|
|
||||||
func retainClientMessage() *clientMessage {
|
|
||||||
v := clientMessagePool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return &clientMessage{}
|
|
||||||
}
|
|
||||||
return v.(*clientMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseClientMessage(cm *clientMessage) {
|
|
||||||
clientMessagePool.Put(cm)
|
|
||||||
}
|
|
||||||
|
|
||||||
var clientMessageCodecPool sync.Pool
|
|
||||||
|
|
||||||
func retainClientMessageCodec(msg *clientMessage, err error, codec codec.Codec) *ClientMessageCodec {
|
|
||||||
var ccm *ClientMessageCodec
|
|
||||||
v := clientMessageCodecPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
ccm = &ClientMessageCodec{}
|
|
||||||
} else {
|
|
||||||
ccm = v.(*ClientMessageCodec)
|
|
||||||
}
|
|
||||||
|
|
||||||
ccm.msg = msg
|
|
||||||
ccm.err = err
|
|
||||||
ccm.codec = codec
|
|
||||||
|
|
||||||
return ccm
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseClientMessageCodec(cr *ClientMessageCodec) {
|
|
||||||
|
|
||||||
clientMessageCodecPool.Put(cr)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,8 @@ package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/codec"
|
"git.loafle.net/commons_go/rpc/codec"
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
@ -19,79 +17,61 @@ type clientNotification struct {
|
||||||
Params *json.RawMessage `json:"params,omitempty"`
|
Params *json.RawMessage `json:"params,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// newClientNotificationCodec returns a new ClientNotificationCodec.
|
|
||||||
func newClientNotificationCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientNotificationCodec, error) {
|
|
||||||
// Decode the request body and check if RPC method is valid.
|
|
||||||
cnc := retainClientNotificationCodec()
|
|
||||||
|
|
||||||
if err := json.Unmarshal(*raw, &cnc.notification); nil != err {
|
|
||||||
releaseClientNotificationCodec(cnc)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return cnc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientNotificationCodec decodes and encodes a single notification.
|
// ClientNotificationCodec decodes and encodes a single notification.
|
||||||
type ClientNotificationCodec struct {
|
type ClientNotificationCodec struct {
|
||||||
notification clientNotification
|
noti *clientNotification
|
||||||
err error
|
err error
|
||||||
|
codec codec.Codec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (crc *ClientNotificationCodec) Method() string {
|
func (cnc *ClientNotificationCodec) Method() string {
|
||||||
return crc.notification.Method
|
return cnc.noti.Method
|
||||||
}
|
}
|
||||||
|
|
||||||
func (crc *ClientNotificationCodec) ReadParams(args []interface{}) error {
|
func (cnc *ClientNotificationCodec) ReadParams(args []interface{}) error {
|
||||||
if crc.err == nil && crc.notification.Params != nil {
|
if cnc.err == nil && cnc.noti.Params != nil {
|
||||||
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
|
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
|
||||||
// JSON params structured object. Unmarshal to the args object.
|
// JSON params structured object. Unmarshal to the args object.
|
||||||
if err := json.Unmarshal(*crc.notification.Params, &args); err != nil {
|
raws := make([]json.RawMessage, len(args))
|
||||||
crc.err = &Error{
|
if err := json.Unmarshal(*cnc.noti.Params, &raws); err != nil {
|
||||||
|
cnc.err = &Error{
|
||||||
Code: E_INVALID_REQ,
|
Code: E_INVALID_REQ,
|
||||||
Message: err.Error(),
|
Message: err.Error(),
|
||||||
Data: crc.notification.Params,
|
Data: cnc.noti.Params,
|
||||||
|
}
|
||||||
|
return cnc.err
|
||||||
|
}
|
||||||
|
|
||||||
|
for indexI := 0; indexI < len(args); indexI++ {
|
||||||
|
raw := raws[indexI]
|
||||||
|
arg := args[indexI]
|
||||||
|
if err := json.Unmarshal(raw, &arg); err != nil {
|
||||||
|
cnc.err = &Error{
|
||||||
|
Code: E_INVALID_REQ,
|
||||||
|
Message: err.Error(),
|
||||||
|
Data: cnc.noti.Params,
|
||||||
|
}
|
||||||
|
return cnc.err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return crc.err
|
return cnc.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (crc *ClientNotificationCodec) Params() ([]string, error) {
|
func (cnc *ClientNotificationCodec) Params() ([]string, error) {
|
||||||
if crc.err == nil && crc.notification.Params != nil {
|
if cnc.err == nil && cnc.noti.Params != nil {
|
||||||
var results []string
|
var results []string
|
||||||
|
|
||||||
if err := json.Unmarshal(*crc.notification.Params, &results); err != nil {
|
if err := json.Unmarshal(*cnc.noti.Params, &results); err != nil {
|
||||||
crc.err = &Error{
|
cnc.err = &Error{
|
||||||
Code: E_INVALID_REQ,
|
Code: E_INVALID_REQ,
|
||||||
Message: err.Error(),
|
Message: err.Error(),
|
||||||
Data: crc.notification.Params,
|
Data: cnc.noti.Params,
|
||||||
}
|
}
|
||||||
return nil, crc.err
|
return nil, cnc.err
|
||||||
}
|
}
|
||||||
|
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
return nil, crc.err
|
return nil, cnc.err
|
||||||
}
|
|
||||||
|
|
||||||
func (crc *ClientNotificationCodec) Close() {
|
|
||||||
releaseClientNotificationCodec(crc)
|
|
||||||
}
|
|
||||||
|
|
||||||
var clientNotificationCodecPool sync.Pool
|
|
||||||
|
|
||||||
func retainClientNotificationCodec() *ClientNotificationCodec {
|
|
||||||
v := clientNotificationCodecPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return &ClientNotificationCodec{}
|
|
||||||
}
|
|
||||||
return v.(*ClientNotificationCodec)
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseClientNotificationCodec(crc *ClientNotificationCodec) {
|
|
||||||
crc.notification.Method = ""
|
|
||||||
crc.notification.Params = nil
|
|
||||||
|
|
||||||
clientNotificationCodecPool.Put(crc)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
package json
|
package json
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Request and Response
|
// Request and Response
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
@ -21,31 +19,3 @@ type clientRequest struct {
|
||||||
// response with the request that it is replying to.
|
// response with the request that it is replying to.
|
||||||
ID interface{} `json:"id"`
|
ID interface{} `json:"id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var clientRequestPool sync.Pool
|
|
||||||
|
|
||||||
func retainClientRequest(method string, params interface{}, id interface{}) *clientRequest {
|
|
||||||
var cr *clientRequest
|
|
||||||
v := clientRequestPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
cr = &clientRequest{}
|
|
||||||
} else {
|
|
||||||
cr = v.(*clientRequest)
|
|
||||||
}
|
|
||||||
|
|
||||||
cr.Version = Version
|
|
||||||
cr.Method = method
|
|
||||||
cr.Params = params
|
|
||||||
cr.ID = id
|
|
||||||
|
|
||||||
return cr
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseClientRequest(cr *clientRequest) {
|
|
||||||
cr.Version = ""
|
|
||||||
cr.Method = ""
|
|
||||||
cr.Params = nil
|
|
||||||
cr.ID = nil
|
|
||||||
|
|
||||||
clientRequestPool.Put(cr)
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ package json
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"io"
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/codec"
|
"git.loafle.net/commons_go/rpc/codec"
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
@ -14,73 +14,76 @@ import (
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// clientResponse represents a JSON-RPC response returned to a client.
|
// clientResponse represents a JSON-RPC response returned to a client.
|
||||||
type clientResponse struct {
|
type clientResponse struct {
|
||||||
|
Version string `json:"jsonrpc"`
|
||||||
Result *json.RawMessage `json:"result,omitempty"`
|
Result *json.RawMessage `json:"result,omitempty"`
|
||||||
Error error `json:"error,omitempty"`
|
Error error `json:"error,omitempty"`
|
||||||
ID interface{} `json:"id,omitempty"`
|
ID interface{} `json:"id,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// newClientResponseCodec returns a new ClientResponseCodec.
|
|
||||||
func newClientResponseCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientResponseCodec, error) {
|
|
||||||
// Decode the request body and check if RPC method is valid.
|
|
||||||
ccr := retainClientResponseCodec()
|
|
||||||
|
|
||||||
err := json.Unmarshal(*raw, &ccr.res)
|
|
||||||
if err != nil {
|
|
||||||
releaseClientResponseCodec(ccr)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if nil == ccr.res.Result && nil == ccr.res.Error {
|
|
||||||
releaseClientResponseCodec(ccr)
|
|
||||||
return nil, fmt.Errorf("This is not Response")
|
|
||||||
}
|
|
||||||
|
|
||||||
return ccr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientResponseCodec decodes and encodes a single request.
|
// ClientResponseCodec decodes and encodes a single request.
|
||||||
type ClientResponseCodec struct {
|
type ClientResponseCodec struct {
|
||||||
res clientResponse
|
res *clientResponse
|
||||||
err error
|
err error
|
||||||
|
codec codec.Codec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ccr *ClientResponseCodec) ID() interface{} {
|
func (crc *ClientResponseCodec) ID() interface{} {
|
||||||
return ccr.res.ID
|
return crc.res.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ccr *ClientResponseCodec) Result(result interface{}) error {
|
func (crc *ClientResponseCodec) Result(result interface{}) error {
|
||||||
if ccr.err == nil && ccr.res.Result != nil {
|
if nil == crc.err && nil != crc.res.Result {
|
||||||
if err := json.Unmarshal(*ccr.res.Result, &result); err != nil {
|
if err := json.Unmarshal(*crc.res.Result, result); nil != err {
|
||||||
params := [1]interface{}{result}
|
crc.err = &Error{
|
||||||
if err = json.Unmarshal(*ccr.res.Result, ¶ms); err != nil {
|
Code: E_PARSE,
|
||||||
ccr.err = err
|
Message: err.Error(),
|
||||||
|
Data: crc.res.Result,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ccr.err
|
|
||||||
|
return crc.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ccr *ClientResponseCodec) Error() error {
|
func (crc *ClientResponseCodec) Error() error {
|
||||||
return ccr.res.Error
|
return crc.res.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ccr *ClientResponseCodec) Close() {
|
func (crc *ClientResponseCodec) Notification() (protocol.ClientNotificationCodec, error) {
|
||||||
releaseClientResponseCodec(ccr)
|
if nil != crc.res.ID || nil == crc.res.Result {
|
||||||
}
|
return nil, fmt.Errorf("RPC[JSON]: This is not notification")
|
||||||
|
|
||||||
var clientResponseCodecPool sync.Pool
|
|
||||||
|
|
||||||
func retainClientResponseCodec() *ClientResponseCodec {
|
|
||||||
v := clientResponseCodecPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return &ClientResponseCodec{}
|
|
||||||
}
|
}
|
||||||
return v.(*ClientResponseCodec)
|
|
||||||
|
noti := &clientNotification{}
|
||||||
|
err := json.Unmarshal(*crc.res.Result, noti)
|
||||||
|
if nil != err {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ClientNotificationCodec{noti: noti, err: err, codec: crc.codec}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func releaseClientResponseCodec(ccr *ClientResponseCodec) {
|
// newClientMessageCodec returns a new ClientMessageCodec.
|
||||||
ccr.res.Result = nil
|
func newClientResponseCodec(r io.Reader, codec codec.Codec) (protocol.ClientResponseCodec, error) {
|
||||||
ccr.res.Error = nil
|
res := &clientResponse{}
|
||||||
ccr.res.ID = 0
|
err := json.NewDecoder(r).Decode(res)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = &Error{
|
||||||
|
Code: E_PARSE,
|
||||||
|
Message: err.Error(),
|
||||||
|
Data: res,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if res.Version != Version {
|
||||||
|
err = &Error{
|
||||||
|
Code: E_INVALID_REQ,
|
||||||
|
Message: "jsonrpc must be " + Version,
|
||||||
|
Data: res,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
clientResponseCodecPool.Put(ccr)
|
return &ClientResponseCodec{res: res, err: err, codec: codec}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package json
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/codec"
|
"git.loafle.net/commons_go/rpc/codec"
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
@ -11,13 +10,6 @@ import (
|
||||||
|
|
||||||
var null = json.RawMessage([]byte("null"))
|
var null = json.RawMessage([]byte("null"))
|
||||||
|
|
||||||
type serverMessage struct {
|
|
||||||
// JSON-RPC protocol.
|
|
||||||
Version string `json:"jsonrpc"`
|
|
||||||
MessageType protocol.MessageType `json:"messageType"`
|
|
||||||
Message interface{} `json:"message"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Codec
|
// Codec
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
@ -44,44 +36,13 @@ func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerRequestCodec, err
|
||||||
|
|
||||||
// WriteNotification send a notification from server to client.
|
// WriteNotification send a notification from server to client.
|
||||||
func (sc *ServerCodec) WriteNotification(w io.Writer, method string, args interface{}) error {
|
func (sc *ServerCodec) WriteNotification(w io.Writer, method string, args interface{}) error {
|
||||||
noti := retainServerNotification(method, args)
|
noti := &serverNotification{Method: method, Params: args}
|
||||||
defer func() {
|
res := &serverResponse{Version: Version, Result: noti}
|
||||||
releaseServerNotification(noti)
|
|
||||||
}()
|
|
||||||
msg := retainServerMessage(protocol.MessageTypeNotification, noti)
|
|
||||||
defer func() {
|
|
||||||
releaseServerMessage(msg)
|
|
||||||
}()
|
|
||||||
encoder := json.NewEncoder(sc.codecSel.SelectByWriter(w).Encode(w))
|
encoder := json.NewEncoder(sc.codecSel.SelectByWriter(w).Encode(w))
|
||||||
// Not sure in which case will this happen. But seems harmless.
|
// Not sure in which case will this happen. But seems harmless.
|
||||||
if err := encoder.Encode(msg); nil != err {
|
if err := encoder.Encode(res); nil != err {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var serverMessagePool sync.Pool
|
|
||||||
|
|
||||||
func retainServerMessage(msgType protocol.MessageType, msg interface{}) *serverMessage {
|
|
||||||
var sm *serverMessage
|
|
||||||
v := serverMessagePool.Get()
|
|
||||||
if v == nil {
|
|
||||||
sm = &serverMessage{}
|
|
||||||
} else {
|
|
||||||
sm = v.(*serverMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
sm.Version = Version
|
|
||||||
sm.MessageType = msgType
|
|
||||||
sm.Message = msg
|
|
||||||
|
|
||||||
return sm
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseServerMessage(sm *serverMessage) {
|
|
||||||
sm.Version = ""
|
|
||||||
sm.MessageType = protocol.MessageTypeUnknown
|
|
||||||
sm.Message = nil
|
|
||||||
|
|
||||||
serverMessagePool.Put(sm)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
package json
|
package json
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// clientRequest represents a JSON-RPC request sent by a client.
|
// clientRequest represents a JSON-RPC request sent by a client.
|
||||||
type serverNotification struct {
|
type serverNotification struct {
|
||||||
// A String containing the name of the method to be invoked.
|
// A String containing the name of the method to be invoked.
|
||||||
|
@ -12,27 +8,3 @@ type serverNotification struct {
|
||||||
// Object to pass as request parameter to the method.
|
// Object to pass as request parameter to the method.
|
||||||
Params interface{} `json:"params"`
|
Params interface{} `json:"params"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var serverNotificationPool sync.Pool
|
|
||||||
|
|
||||||
func retainServerNotification(method string, args interface{}) *serverNotification {
|
|
||||||
var sn *serverNotification
|
|
||||||
v := serverNotificationPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
sn = &serverNotification{}
|
|
||||||
} else {
|
|
||||||
sn = v.(*serverNotification)
|
|
||||||
}
|
|
||||||
|
|
||||||
sn.Method = method
|
|
||||||
sn.Params = args
|
|
||||||
|
|
||||||
return sn
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseServerNotification(sn *serverNotification) {
|
|
||||||
sn.Method = ""
|
|
||||||
sn.Params = nil
|
|
||||||
|
|
||||||
serverNotificationPool.Put(sn)
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package json
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/codec"
|
"git.loafle.net/commons_go/rpc/codec"
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
@ -37,7 +36,7 @@ type serverRequest struct {
|
||||||
// newRequestCodec returns a new ServerRequestCodec.
|
// newRequestCodec returns a new ServerRequestCodec.
|
||||||
func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerRequestCodec, error) {
|
func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerRequestCodec, error) {
|
||||||
// Decode the request body and check if RPC method is valid.
|
// Decode the request body and check if RPC method is valid.
|
||||||
req := retainServerRequest()
|
req := &serverRequest{}
|
||||||
err := json.NewDecoder(r).Decode(req)
|
err := json.NewDecoder(r).Decode(req)
|
||||||
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -57,7 +56,7 @@ func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerReque
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return retainServerRequestCodec(req, err, codec), nil
|
return &ServerRequestCodec{req: req, err: err, codec: codec}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerRequestCodec decodes and encodes a single request.
|
// ServerRequestCodec decodes and encodes a single request.
|
||||||
|
@ -67,14 +66,6 @@ type ServerRequestCodec struct {
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close is callback function that end of request.
|
|
||||||
func (src *ServerRequestCodec) Close() {
|
|
||||||
if nil != src.req {
|
|
||||||
releaseServerRequest(src.req)
|
|
||||||
}
|
|
||||||
releaseServerRequestCodec(src)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Method returns the RPC method for the current request.
|
// Method returns the RPC method for the current request.
|
||||||
//
|
//
|
||||||
// The method uses a dotted notation as in "Service.Method".
|
// The method uses a dotted notation as in "Service.Method".
|
||||||
|
@ -157,7 +148,7 @@ func (src *ServerRequestCodec) Params() ([]string, error) {
|
||||||
|
|
||||||
// WriteResponse encodes the response and writes it to the ResponseWriter.
|
// WriteResponse encodes the response and writes it to the ResponseWriter.
|
||||||
func (src *ServerRequestCodec) WriteResponse(w io.Writer, reply interface{}) error {
|
func (src *ServerRequestCodec) WriteResponse(w io.Writer, reply interface{}) error {
|
||||||
res := retainServerResponse(reply, nil, src.req.ID)
|
res := &serverResponse{Version: Version, Result: reply, ID: src.req.ID}
|
||||||
return src.writeServerResponse(w, res)
|
return src.writeServerResponse(w, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,69 +161,18 @@ func (src *ServerRequestCodec) WriteError(w io.Writer, status int, err error) er
|
||||||
Message: err.Error(),
|
Message: err.Error(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res := retainServerResponse(nil, jsonErr, src.req.ID)
|
res := &serverResponse{Version: Version, Error: jsonErr, ID: src.req.ID}
|
||||||
return src.writeServerResponse(w, res)
|
return src.writeServerResponse(w, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverResponse) error {
|
func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverResponse) error {
|
||||||
defer func() {
|
|
||||||
releaseServerResponse(res)
|
|
||||||
}()
|
|
||||||
// ID is null for notifications and they don't have a response.
|
// ID is null for notifications and they don't have a response.
|
||||||
if src.req.ID != nil {
|
if req.ID != nil {
|
||||||
msg := retainServerMessage(protocol.MessageTypeResponse, res)
|
|
||||||
defer func() {
|
|
||||||
releaseServerMessage(msg)
|
|
||||||
}()
|
|
||||||
encoder := json.NewEncoder(src.codec.Encode(w))
|
encoder := json.NewEncoder(src.codec.Encode(w))
|
||||||
// Not sure in which case will this happen. But seems harmless.
|
// Not sure in which case will this happen. But seems harmless.
|
||||||
if err := encoder.Encode(msg); nil != err {
|
if err := encoder.Encode(res); nil != err {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var serverRequestCodecPool sync.Pool
|
|
||||||
|
|
||||||
func retainServerRequestCodec(req *serverRequest, err error, codec codec.Codec) *ServerRequestCodec {
|
|
||||||
var src *ServerRequestCodec
|
|
||||||
v := serverRequestCodecPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
src = &ServerRequestCodec{}
|
|
||||||
} else {
|
|
||||||
src = v.(*ServerRequestCodec)
|
|
||||||
}
|
|
||||||
|
|
||||||
src.req = req
|
|
||||||
src.err = err
|
|
||||||
src.codec = codec
|
|
||||||
|
|
||||||
return src
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseServerRequestCodec(src *ServerRequestCodec) {
|
|
||||||
src.req = nil
|
|
||||||
src.err = nil
|
|
||||||
src.codec = nil
|
|
||||||
|
|
||||||
serverRequestCodecPool.Put(src)
|
|
||||||
}
|
|
||||||
|
|
||||||
var serverRequestPool sync.Pool
|
|
||||||
|
|
||||||
func retainServerRequest() *serverRequest {
|
|
||||||
v := serverRequestPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return &serverRequest{}
|
|
||||||
}
|
|
||||||
return v.(*serverRequest)
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseServerRequest(sr *serverRequest) {
|
|
||||||
sr.Method = ""
|
|
||||||
sr.Params = nil
|
|
||||||
sr.ID = nil
|
|
||||||
|
|
||||||
serverRequestPool.Put(sr)
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
@ -11,6 +10,8 @@ import (
|
||||||
|
|
||||||
// serverResponse represents a JSON-RPC response returned by the server.
|
// serverResponse represents a JSON-RPC response returned by the server.
|
||||||
type serverResponse struct {
|
type serverResponse struct {
|
||||||
|
// JSON-RPC protocol.
|
||||||
|
Version string `json:"jsonrpc"`
|
||||||
// The Object that was returned by the invoked method. This must be null
|
// The Object that was returned by the invoked method. This must be null
|
||||||
// in case there was an error invoking the method.
|
// in case there was an error invoking the method.
|
||||||
// As per spec the member will be omitted if there was an error.
|
// As per spec the member will be omitted if there was an error.
|
||||||
|
@ -24,29 +25,3 @@ type serverResponse struct {
|
||||||
// This must be the same id as the request it is responding to.
|
// This must be the same id as the request it is responding to.
|
||||||
ID *json.RawMessage `json:"id,omitempty"`
|
ID *json.RawMessage `json:"id,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var serverResponsePool sync.Pool
|
|
||||||
|
|
||||||
func retainServerResponse(result interface{}, err *Error, id *json.RawMessage) *serverResponse {
|
|
||||||
var sr *serverResponse
|
|
||||||
v := serverResponsePool.Get()
|
|
||||||
if v == nil {
|
|
||||||
sr = &serverResponse{}
|
|
||||||
} else {
|
|
||||||
sr = v.(*serverResponse)
|
|
||||||
}
|
|
||||||
|
|
||||||
sr.Result = result
|
|
||||||
sr.Error = err
|
|
||||||
sr.ID = id
|
|
||||||
|
|
||||||
return sr
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseServerResponse(sr *serverResponse) {
|
|
||||||
sr.Result = nil
|
|
||||||
sr.Error = nil
|
|
||||||
sr.ID = nil
|
|
||||||
|
|
||||||
serverResponsePool.Put(sr)
|
|
||||||
}
|
|
||||||
|
|
|
@ -10,5 +10,4 @@ type RegistryCodec interface {
|
||||||
// Reads the request filling the RPC method args.
|
// Reads the request filling the RPC method args.
|
||||||
ReadParams(args []interface{}) error
|
ReadParams(args []interface{}) error
|
||||||
Params() ([]string, error)
|
Params() ([]string, error)
|
||||||
Close()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ type ServerCodec interface {
|
||||||
// serialization scheme.
|
// serialization scheme.
|
||||||
type ServerRequestCodec interface {
|
type ServerRequestCodec interface {
|
||||||
RegistryCodec
|
RegistryCodec
|
||||||
|
|
||||||
WriteResponse(w io.Writer, reply interface{}) error
|
WriteResponse(w io.Writer, reply interface{}) error
|
||||||
WriteError(w io.Writer, status int, err error) error
|
WriteError(w io.Writer, status int, err error) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,25 +11,25 @@ import (
|
||||||
cuc "git.loafle.net/commons_go/util/context"
|
cuc "git.loafle.net/commons_go/util/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewServlet(sh ServletHandler) Servlet {
|
func NewRPCServlet(sh RPCServletHandler) RPCServlet {
|
||||||
return &servlet{
|
return &rpcServlet{
|
||||||
sh: sh,
|
sh: sh,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Servlet interface {
|
type RPCServlet interface {
|
||||||
Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error
|
Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error
|
||||||
Stop()
|
Stop()
|
||||||
|
|
||||||
Send(method string, args ...interface{}) (err error)
|
Send(method string, args ...interface{}) (err error)
|
||||||
|
|
||||||
Context() ServletContext
|
Context() RPCServletContext
|
||||||
}
|
}
|
||||||
|
|
||||||
type servlet struct {
|
type rpcServlet struct {
|
||||||
ctx ServletContext
|
ctx RPCServletContext
|
||||||
sh ServletHandler
|
sh RPCServletHandler
|
||||||
messageQueueChan chan *messageState
|
responseQueueChan chan *responseState
|
||||||
|
|
||||||
doneChan chan<- error
|
doneChan chan<- error
|
||||||
conn interface{}
|
conn interface{}
|
||||||
|
@ -39,7 +39,7 @@ type servlet struct {
|
||||||
stopWg sync.WaitGroup
|
stopWg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *servlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error {
|
func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error {
|
||||||
if nil == s.sh {
|
if nil == s.sh {
|
||||||
panic("Servlet: servlet handler must be specified.")
|
panic("Servlet: servlet handler must be specified.")
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ func (s *servlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<-
|
||||||
}
|
}
|
||||||
|
|
||||||
s.stopChan = make(chan struct{})
|
s.stopChan = make(chan struct{})
|
||||||
s.messageQueueChan = make(chan *messageState, s.sh.GetPendingMessages())
|
s.responseQueueChan = make(chan *responseState, s.sh.GetPendingResponses())
|
||||||
|
|
||||||
s.stopWg.Add(1)
|
s.stopWg.Add(1)
|
||||||
go handleServlet(s)
|
go handleServlet(s)
|
||||||
|
@ -72,7 +72,7 @@ func (s *servlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<-
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *servlet) Stop() {
|
func (s *rpcServlet) Stop() {
|
||||||
if s.stopChan == nil {
|
if s.stopChan == nil {
|
||||||
panic("Server: server must be started before stopping it")
|
panic("Server: server must be started before stopping it")
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ func (s *servlet) Stop() {
|
||||||
s.stopChan = nil
|
s.stopChan = nil
|
||||||
s.sh.Destroy(s.ctx)
|
s.sh.Destroy(s.ctx)
|
||||||
|
|
||||||
s.messageQueueChan = nil
|
s.responseQueueChan = nil
|
||||||
|
|
||||||
s.conn = nil
|
s.conn = nil
|
||||||
s.serverCodec = nil
|
s.serverCodec = nil
|
||||||
|
@ -89,21 +89,25 @@ func (s *servlet) Stop() {
|
||||||
logging.Logger().Info(fmt.Sprintf("Servlet is stopped"))
|
logging.Logger().Info(fmt.Sprintf("Servlet is stopped"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *servlet) Send(method string, args ...interface{}) (err error) {
|
func (s *rpcServlet) Send(method string, args ...interface{}) (err error) {
|
||||||
ms := retainMessageState(protocol.MessageTypeNotification)
|
noti := ¬ification{
|
||||||
ms.noti.method = method
|
method: method,
|
||||||
ms.noti.args = args
|
args: args,
|
||||||
|
}
|
||||||
|
rs := &responseState{
|
||||||
|
noti: noti,
|
||||||
|
}
|
||||||
|
|
||||||
s.messageQueueChan <- ms
|
s.responseQueueChan <- rs
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *servlet) Context() ServletContext {
|
func (s *rpcServlet) Context() RPCServletContext {
|
||||||
return s.ctx
|
return s.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleServlet(s *servlet) {
|
func handleServlet(s *rpcServlet) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -137,7 +141,7 @@ func handleServlet(s *servlet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleReader(s *servlet, stopChan chan struct{}, doneChan chan error) {
|
func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
|
||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
@ -149,7 +153,7 @@ func handleReader(s *servlet, stopChan chan struct{}, doneChan chan error) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
requestCodec, err := s.sh.GetRequest(s.ctx, s.serverCodec, s.conn)
|
requestCodec, err := s.sh.ReadRequest(s.ctx, s.serverCodec, s.conn)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
||||||
err = fmt.Errorf("RPC Server: disconnected from client")
|
err = fmt.Errorf("RPC Server: disconnected from client")
|
||||||
|
@ -171,108 +175,72 @@ func handleReader(s *servlet, stopChan chan struct{}, doneChan chan error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleWriter(s *servlet, stopChan chan struct{}, doneChan chan error) {
|
func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
|
||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = fmt.Errorf("RPC Server: Panic when writing message to client: %v", r)
|
err = fmt.Errorf("RPC Server: Panic when writing response to client: %v", r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
doneChan <- err
|
doneChan <- err
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var ms *messageState
|
var rs *responseState
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case ms = <-s.messageQueueChan:
|
case rs = <-s.responseQueueChan:
|
||||||
default:
|
default:
|
||||||
// Give the last chance for ready goroutines filling s.messageQueueChan :)
|
// Give the last chance for ready goroutines filling s.responseQueueChan :)
|
||||||
runtime.Gosched()
|
runtime.Gosched()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stopChan:
|
case <-stopChan:
|
||||||
err = fmt.Errorf("RPC Server: writing message stopped because get stop channel")
|
err = fmt.Errorf("RPC Server: writing message stopped because get stop channel")
|
||||||
return
|
return
|
||||||
case ms = <-s.messageQueueChan:
|
case rs = <-s.responseQueueChan:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch ms.messageType {
|
if nil != rs.requestCodec {
|
||||||
case protocol.MessageTypeResponse:
|
if err := s.sh.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err {
|
||||||
if err := s.sh.SendResponse(s.ctx, s.conn, ms.res.requestCodec, ms.res.result, ms.res.err); nil != err {
|
logging.Logger().Error(fmt.Sprintf("RPC Server: response error %v", err))
|
||||||
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
|
|
||||||
}
|
}
|
||||||
ms.res.requestCodec.Close()
|
} else {
|
||||||
case protocol.MessageTypeNotification:
|
if err := s.sh.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err {
|
||||||
if err := s.sh.SendNotification(s.ctx, s.conn, s.serverCodec, ms.noti.method, ms.noti.args...); nil != err {
|
logging.Logger().Error(fmt.Sprintf("RPC Server: notification error %v", err))
|
||||||
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
|
|
||||||
}
|
}
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleRequest(s *servlet, requestCodec protocol.ServerRequestCodec) {
|
func handleRequest(s *rpcServlet, requestCodec protocol.ServerRequestCodec) {
|
||||||
defer func() {
|
defer func() {
|
||||||
s.stopWg.Done()
|
s.stopWg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
result, err := s.sh.Invoke(s.ctx, requestCodec)
|
result, err := s.sh.Invoke(s.ctx, requestCodec)
|
||||||
|
|
||||||
ms := retainMessageState(protocol.MessageTypeResponse)
|
rs := &responseState{
|
||||||
ms.res.requestCodec = requestCodec
|
requestCodec: requestCodec,
|
||||||
ms.res.result = result
|
result: result,
|
||||||
ms.res.err = err
|
err: err,
|
||||||
|
}
|
||||||
|
|
||||||
s.messageQueueChan <- ms
|
s.responseQueueChan <- rs
|
||||||
}
|
}
|
||||||
|
|
||||||
type messageState struct {
|
type responseState struct {
|
||||||
messageType protocol.MessageType
|
|
||||||
res messageResponse
|
|
||||||
noti messageNotification
|
|
||||||
}
|
|
||||||
|
|
||||||
type messageResponse struct {
|
|
||||||
requestCodec protocol.ServerRequestCodec
|
requestCodec protocol.ServerRequestCodec
|
||||||
result interface{}
|
result interface{}
|
||||||
|
noti *notification
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
type messageNotification struct {
|
type notification struct {
|
||||||
method string
|
method string
|
||||||
args []interface{}
|
args []interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var messageStatePool sync.Pool
|
|
||||||
|
|
||||||
func retainMessageState(messageType protocol.MessageType) *messageState {
|
|
||||||
var ms *messageState
|
|
||||||
v := messageStatePool.Get()
|
|
||||||
if v == nil {
|
|
||||||
ms = &messageState{}
|
|
||||||
} else {
|
|
||||||
ms = v.(*messageState)
|
|
||||||
}
|
|
||||||
|
|
||||||
ms.messageType = messageType
|
|
||||||
|
|
||||||
return ms
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseMessageState(ms *messageState) {
|
|
||||||
ms.messageType = protocol.MessageTypeUnknown
|
|
||||||
|
|
||||||
ms.res.requestCodec = nil
|
|
||||||
ms.res.result = nil
|
|
||||||
ms.res.err = nil
|
|
||||||
|
|
||||||
ms.noti.method = ""
|
|
||||||
ms.noti.args = nil
|
|
||||||
|
|
||||||
messageStatePool.Put(ms)
|
|
||||||
}
|
|
20
rpc_servlet_context.go
Normal file
20
rpc_servlet_context.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
cuc "git.loafle.net/commons_go/util/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RPCServletContext interface {
|
||||||
|
cuc.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
type rpcServletContext struct {
|
||||||
|
cuc.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRPCServletContext(parent cuc.Context) RPCServletContext {
|
||||||
|
sCTX := &rpcServletContext{}
|
||||||
|
sCTX.Context = cuc.NewContext(parent)
|
||||||
|
|
||||||
|
return sCTX
|
||||||
|
}
|
25
rpc_servlet_handler.go
Normal file
25
rpc_servlet_handler.go
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
cuc "git.loafle.net/commons_go/util/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RPCServletHandler interface {
|
||||||
|
ServletContext(parent cuc.Context) RPCServletContext
|
||||||
|
|
||||||
|
Init(servletCTX RPCServletContext) error
|
||||||
|
|
||||||
|
ReadRequest(servletCTX RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error)
|
||||||
|
Invoke(servletCTX RPCServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error)
|
||||||
|
WriteResponse(servletCTX RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error
|
||||||
|
WriteNotification(servletCTX RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error
|
||||||
|
|
||||||
|
Destroy(servletCTX RPCServletContext)
|
||||||
|
|
||||||
|
RegisterCodec(contentType string, codec protocol.ServerCodec)
|
||||||
|
getCodec(contentType string) (protocol.ServerCodec, error)
|
||||||
|
|
||||||
|
GetPendingResponses() int
|
||||||
|
Validate()
|
||||||
|
}
|
87
rpc_servlet_handlers.go
Normal file
87
rpc_servlet_handlers.go
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
cuc "git.loafle.net/commons_go/util/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RPCServletHandlers struct {
|
||||||
|
// The maximum number of pending messages in the queue.
|
||||||
|
//
|
||||||
|
// The number of pending requsts should exceed the expected number
|
||||||
|
// of concurrent goroutines calling client's methods.
|
||||||
|
// Otherwise a lot of ClientError.Overflow errors may appear.
|
||||||
|
//
|
||||||
|
// Default is DefaultPendingMessages.
|
||||||
|
PendingResponses int
|
||||||
|
|
||||||
|
codecs map[string]protocol.ServerCodec
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) ServletContext(parent cuc.Context) RPCServletContext {
|
||||||
|
return newRPCServletContext(parent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) Init(servletCTX RPCServletContext) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) ReadRequest(servletCTX RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
|
||||||
|
return nil, fmt.Errorf("Servlet Handler: ReadRequest is not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) Invoke(servletCTX RPCServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) {
|
||||||
|
return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) WriteResponse(servletCTX RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
|
||||||
|
return fmt.Errorf("Servlet Handler: WriteResponse is not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) WriteNotification(servletCTX RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error {
|
||||||
|
return fmt.Errorf("Servlet Handler: WriteNotification is not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) Destroy(servletCTX RPCServletContext) {
|
||||||
|
// no op
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterCodec adds a new codec to the server.
|
||||||
|
//
|
||||||
|
// Codecs are defined to process a given serialization scheme, e.g., JSON or
|
||||||
|
// XML. A codec is chosen based on the "Content-Type" header from the request,
|
||||||
|
// excluding the charset definition.
|
||||||
|
func (sh *RPCServletHandlers) RegisterCodec(contentType string, codec protocol.ServerCodec) {
|
||||||
|
if nil == sh.codecs {
|
||||||
|
sh.codecs = make(map[string]protocol.ServerCodec)
|
||||||
|
}
|
||||||
|
sh.codecs[strings.ToLower(contentType)] = codec
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) getCodec(contentType string) (protocol.ServerCodec, error) {
|
||||||
|
var codec protocol.ServerCodec
|
||||||
|
if contentType == "" && len(sh.codecs) == 1 {
|
||||||
|
// If Content-Type is not set and only one codec has been registered,
|
||||||
|
// then default to that codec.
|
||||||
|
for _, c := range sh.codecs {
|
||||||
|
codec = c
|
||||||
|
}
|
||||||
|
} else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil {
|
||||||
|
return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType)
|
||||||
|
}
|
||||||
|
|
||||||
|
return codec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) GetPendingResponses() int {
|
||||||
|
return sh.PendingResponses
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *RPCServletHandlers) Validate() {
|
||||||
|
if 0 >= sh.PendingResponses {
|
||||||
|
sh.PendingResponses = DefaultPendingResponses
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,5 +3,5 @@ package server
|
||||||
import "git.loafle.net/commons_go/rpc"
|
import "git.loafle.net/commons_go/rpc"
|
||||||
|
|
||||||
type ServletHandler interface {
|
type ServletHandler interface {
|
||||||
rpc.ServletHandler
|
rpc.RPCServletHandler
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,12 +9,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServletHandlers struct {
|
type ServletHandlers struct {
|
||||||
rpc.ServletHandlers
|
rpc.RPCServletHandlers
|
||||||
|
|
||||||
RPCRegistry rpc.Registry
|
RPCRegistry rpc.Registry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) {
|
func (sh *ServletHandlers) Invoke(servletCTX rpc.RPCServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) {
|
||||||
if !sh.RPCRegistry.HasMethod(requestCodec.Method()) {
|
if !sh.RPCRegistry.HasMethod(requestCodec.Method()) {
|
||||||
return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method())
|
return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method())
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result i
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServletHandlers) Validate() {
|
func (sh *ServletHandlers) Validate() {
|
||||||
sh.ServletHandlers.Validate()
|
sh.RPCServletHandlers.Validate()
|
||||||
|
|
||||||
if nil == sh.RPCRegistry {
|
if nil == sh.RPCRegistry {
|
||||||
logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified"))
|
logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified"))
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
package rpc
|
|
||||||
|
|
||||||
import (
|
|
||||||
cuc "git.loafle.net/commons_go/util/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ServletContext interface {
|
|
||||||
cuc.Context
|
|
||||||
}
|
|
||||||
|
|
||||||
type servletContext struct {
|
|
||||||
cuc.Context
|
|
||||||
}
|
|
||||||
|
|
||||||
func newServletContext(parent cuc.Context) ServletContext {
|
|
||||||
sCTX := &servletContext{}
|
|
||||||
sCTX.Context = cuc.NewContext(parent)
|
|
||||||
|
|
||||||
return sCTX
|
|
||||||
}
|
|
|
@ -1,26 +0,0 @@
|
||||||
package rpc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
|
||||||
cuc "git.loafle.net/commons_go/util/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ServletHandler interface {
|
|
||||||
ServletContext(parent cuc.Context) ServletContext
|
|
||||||
|
|
||||||
Init(servletCTX ServletContext) error
|
|
||||||
|
|
||||||
GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error)
|
|
||||||
Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error)
|
|
||||||
SendResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error
|
|
||||||
SendNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error
|
|
||||||
|
|
||||||
Destroy(servletCTX ServletContext)
|
|
||||||
|
|
||||||
RegisterCodec(contentType string, codec protocol.ServerCodec)
|
|
||||||
|
|
||||||
getCodec(contentType string) (protocol.ServerCodec, error)
|
|
||||||
|
|
||||||
GetPendingMessages() int
|
|
||||||
Validate()
|
|
||||||
}
|
|
|
@ -1,87 +0,0 @@
|
||||||
package rpc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
|
||||||
cuc "git.loafle.net/commons_go/util/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ServletHandlers struct {
|
|
||||||
// The maximum number of pending messages in the queue.
|
|
||||||
//
|
|
||||||
// The number of pending requsts should exceed the expected number
|
|
||||||
// of concurrent goroutines calling client's methods.
|
|
||||||
// Otherwise a lot of ClientError.Overflow errors may appear.
|
|
||||||
//
|
|
||||||
// Default is DefaultPendingMessages.
|
|
||||||
PendingMessages int
|
|
||||||
|
|
||||||
codecs map[string]protocol.ServerCodec
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) ServletContext(parent cuc.Context) ServletContext {
|
|
||||||
return newServletContext(parent)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) Init(servletCTX ServletContext) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
|
|
||||||
return nil, fmt.Errorf("Servlet Handler: GetRequest is not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) {
|
|
||||||
return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) SendResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
|
|
||||||
return fmt.Errorf("Servlet Handler: SendResponse is not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) SendNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error {
|
|
||||||
return fmt.Errorf("Servlet Handler: SendNotification is not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) Destroy(servletCTX ServletContext) {
|
|
||||||
// no op
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterCodec adds a new codec to the server.
|
|
||||||
//
|
|
||||||
// Codecs are defined to process a given serialization scheme, e.g., JSON or
|
|
||||||
// XML. A codec is chosen based on the "Content-Type" header from the request,
|
|
||||||
// excluding the charset definition.
|
|
||||||
func (sh *ServletHandlers) RegisterCodec(contentType string, codec protocol.ServerCodec) {
|
|
||||||
if nil == sh.codecs {
|
|
||||||
sh.codecs = make(map[string]protocol.ServerCodec)
|
|
||||||
}
|
|
||||||
sh.codecs[strings.ToLower(contentType)] = codec
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) getCodec(contentType string) (protocol.ServerCodec, error) {
|
|
||||||
var codec protocol.ServerCodec
|
|
||||||
if contentType == "" && len(sh.codecs) == 1 {
|
|
||||||
// If Content-Type is not set and only one codec has been registered,
|
|
||||||
// then default to that codec.
|
|
||||||
for _, c := range sh.codecs {
|
|
||||||
codec = c
|
|
||||||
}
|
|
||||||
} else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil {
|
|
||||||
return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType)
|
|
||||||
}
|
|
||||||
|
|
||||||
return codec, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) GetPendingMessages() int {
|
|
||||||
return sh.PendingMessages
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServletHandlers) Validate() {
|
|
||||||
if 0 >= sh.PendingMessages {
|
|
||||||
sh.PendingMessages = DefaultPendingMessages
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user